设计模式一:生产者消费者模式(及LinkedBlockingQueue的介绍使用)
2018-03-26 16:44
519 查看
在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。
这个阻塞队列就是用来给生产者和消费者解耦的。纵观大多数设计模式,都会找一个第三者出来进行解耦,如工厂模式的第三者是工厂类,模板模式的第三者是模板类。在学习一些设计模式的过程中,如果先找到这个模式的第三者,能帮助我们快速熟悉一个设计模式。
实例代码2:
四、LinkedBlockingQueue介绍使用在Java多线程应用中,队列的使用率很高,多数生产消费模型的首选数据结构就是队列(先进先出)。Java提供的线程安全的Queue可以分为阻塞队列和非阻塞队列,其中阻塞队列的典型例子是BlockingQueue,非阻塞队列的典型例子是ConcurrentLinkedQueue,在实际应用中要根据实际需要选用阻塞队列或者非阻塞队列。注:什么叫线程安全?这个首先要明确。线程安全就是说多线程访问同一代码,不会产生不确定的结果。由于LinkedBlockingQueue实现是线程安全的,实现了先进先出等特性,是作为生产者消费者的首选,LinkedBlockingQueue 可以指定容量,也可以不指定,不指定的话,默认最大是Integer.MAX_VALUE,其中主要用到put和take方法,put方法在队列满的时候会阻塞直到有队列成员被消费,take方法在队列空的时候会阻塞,直到有队列成员被放进来。
常用API:offer将元素插入队列,成功返回true,如果当前没有可用的空间,则返回falseoffer(E e, long timeout, TimeUnit unit) 将元素插入队列,在到达指定的等待时间前等待可用的空间E poll(long timeout, TimeUnit unit) 获取并移除队列的头部,在指定的等待时间前等待可用的元素void put(E e) 将元素插入队列,将等待可用的空间(堵塞)take() 获取并移除队列的头部,在元素变得可用之前一直等待(堵塞) //offer方法为非堵塞的
//queue.offer(rnd.nextInt(100), 1, TimeUnit.SECONDS); //等待1秒后还不能加入队列则返回失败,放弃加入
//queue.offer(rnd.nextInt(100));
//poll方法为非堵塞的
//Integer value = queue.poll(1, TimeUnit.SECONDS); //等待1秒后还没有数据可取则返回失败,放弃获取
//Integer value = queue.poll();
总结:在并发编程中,一般推荐使用阻塞队列,这样实现可以尽量地避免程序出现意外的错误。阻塞队列使用最经典的场景就是socket客户端数据的读取和解析,读取数据的线程不断将数据放入队列,然后解析线程不断从队列取数据解析。还有其他类似的场景,只要符合生产者-消费者模型的都可以使用阻塞队列。使用非阻塞队列,虽然能即时返回结果(消费结果),但必须自行编码解决返回为空的情况处理(以及消费重试等问题)。
另外他们都是线程安全的,不用考虑线程同步问题。
一、为什么要使用生产者和消费者模式
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这种生产消费能力不均衡的问题,所以便有了生产者和消费者模式。二、什么是生产者消费者模式
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。纵观大多数设计模式,都会找一个第三者出来进行解耦,如工厂模式的第三者是工厂类,模板模式的第三者是模板类。在学习一些设计模式的过程中,如果先找到这个模式的第三者,能帮助我们快速熟悉一个设计模式。
三、生产者消费者模式实战
实例代码1:package cn.thread; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; /** * 多线程模拟实现生产者/消费者模型 * * @author tkhoon */ public class BlockingQueueTest2 { /** * * 定义装苹果的篮子 * */ public class Basket { // 篮子,能够容纳3个苹果 BlockingQueue<String> basket = new LinkedBlockingQueue<String>(3); // 生产苹果,放入篮子 public void produce() throws InterruptedException { // put方法放入一个苹果,若basket满了,等到basket有位置 basket.put("An apple"); } // 消费苹果,从篮子中取走 public String consume() throws InterruptedException { // take方法取出一个苹果,若basket为空,等到basket有苹果为止(获取并移除此队列的头部) return basket.take(); } } // 定义苹果生产者 class Producer implements Runnable { private String instance; private Basket basket; public Producer(String instance, Basket basket) { this.instance = instance; this.basket = basket; } public void run() { try { 4000 while (true) { // 生产苹果 System.out.println("生产者准备生产苹果:" + instance); basket.produce(); System.out.println("!生产者生产苹果完毕:" + instance); // 休眠300ms Thread.sleep(300); } } catch (InterruptedException ex) { System.out.println("Producer Interrupted"); } } } // 定义苹果消费者 class Consumer implements Runnable { private String instance; private Basket basket; public Consumer(String instance, Basket basket) { this.instance = instance; this.basket = basket; } public void run() { try { while (true) { // 消费苹果 System.out.println("消费者准备消费苹果:" + instance); System.out.println(basket.consume()); System.out.println("!消费者消费苹果完毕:" + instance); // 休眠1000ms Thread.sleep(1000); } } catch (InterruptedException ex) { System.out.println("Consumer Interrupted"); } } } public static void main(String[] args) { BlockingQueueTest2 test = new BlockingQueueTest2(); // 建立一个装苹果的篮子 Basket basket = test.new Basket(); ExecutorService service = Executors.newCachedThreadPool(); Producer producer = test.new Producer("生产者001", basket); Producer producer2 = test.new Producer("生产者002", basket); Consumer consumer = test.new Consumer("消费者001", basket); service.submit(producer); service.submit(producer2); service.submit(consumer); // 程序运行5s后,所有任务停止 // try { // Thread.sleep(1000 * 5); // } catch (InterruptedException e) { // e.printStackTrace(); // } // service.shutdownNow(); } }代码中关于LinkedBlockingQueue的介绍会在稍后进行介绍。
实例代码2:
public class QuickEmailToWikiExtractor extends AbstractExtractor { private ThreadPoolExecutor threadsPool; private ArticleBlockingQueue<ExchangeEmailShallowDTO> emailQueue; public QuickEmailToWikiExtractor() { emailQueue= new ArticleBlockingQueue<ExchangeEmailShallowDTO>(); int corePoolSize = Runtime.getRuntime().availableProcessors() * 2; threadsPool = new ThreadPoolExecutor(corePoolSize, corePoolSize, 10l, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(2000)); } //执行的主方法 public void extract() { logger.debug("开始" + getExtractorName() + "。。"); long start = System.currentTimeMillis(); //抽取所有邮件放到队列里(生产者) new ExtractEmailTask().start(); // 把队列里的文章插入到Wiki(消费者) insertToWiki(); long end = System.currentTimeMillis(); double cost = (end - start) / 1000; logger.debug("完成" + getExtractorName() + ",花费时间:" + cost + "秒"); } /** * 把队列里的文章插入到Wiki */ private void insertToWiki() { //登录wiki,每间隔一段时间需要登录一次 confluenceService.login(RuleFactory.USER_NAME, RuleFactory.PASSWORD); while (true) { //2秒内取不到就退出 ExchangeEmailShallowDTO email = emailQueue.poll(2, TimeUnit.SECONDS); if (email == null) { break; } threadsPool.submit(new insertToWikiTask(email)); } } protected List<Article> extractEmail() { List<ExchangeEmailShallowDTO> allEmails = getEmailService().queryAllEmails(); if (allEmails == null) { return null; } for (ExchangeEmailShallowDTO exchangeEmailShallowDTO : allEmails) { emailQueue.offer(exchangeEmailShallowDTO); } return null; } /** * 抽取邮件任务(生产者类) * * @author tengfei.fangtf */ public class ExtractEmailTask extends Thread { public void run() { extractEmail(); } } }
四、LinkedBlockingQueue介绍使用在Java多线程应用中,队列的使用率很高,多数生产消费模型的首选数据结构就是队列(先进先出)。Java提供的线程安全的Queue可以分为阻塞队列和非阻塞队列,其中阻塞队列的典型例子是BlockingQueue,非阻塞队列的典型例子是ConcurrentLinkedQueue,在实际应用中要根据实际需要选用阻塞队列或者非阻塞队列。注:什么叫线程安全?这个首先要明确。线程安全就是说多线程访问同一代码,不会产生不确定的结果。由于LinkedBlockingQueue实现是线程安全的,实现了先进先出等特性,是作为生产者消费者的首选,LinkedBlockingQueue 可以指定容量,也可以不指定,不指定的话,默认最大是Integer.MAX_VALUE,其中主要用到put和take方法,put方法在队列满的时候会阻塞直到有队列成员被消费,take方法在队列空的时候会阻塞,直到有队列成员被放进来。
常用API:offer将元素插入队列,成功返回true,如果当前没有可用的空间,则返回falseoffer(E e, long timeout, TimeUnit unit) 将元素插入队列,在到达指定的等待时间前等待可用的空间E poll(long timeout, TimeUnit unit) 获取并移除队列的头部,在指定的等待时间前等待可用的元素void put(E e) 将元素插入队列,将等待可用的空间(堵塞)take() 获取并移除队列的头部,在元素变得可用之前一直等待(堵塞) //offer方法为非堵塞的
//queue.offer(rnd.nextInt(100), 1, TimeUnit.SECONDS); //等待1秒后还不能加入队列则返回失败,放弃加入
//queue.offer(rnd.nextInt(100));
//poll方法为非堵塞的
//Integer value = queue.poll(1, TimeUnit.SECONDS); //等待1秒后还没有数据可取则返回失败,放弃获取
//Integer value = queue.poll();
总结:在并发编程中,一般推荐使用阻塞队列,这样实现可以尽量地避免程序出现意外的错误。阻塞队列使用最经典的场景就是socket客户端数据的读取和解析,读取数据的线程不断将数据放入队列,然后解析线程不断从队列取数据解析。还有其他类似的场景,只要符合生产者-消费者模型的都可以使用阻塞队列。使用非阻塞队列,虽然能即时返回结果(消费结果),但必须自行编码解决返回为空的情况处理(以及消费重试等问题)。
另外他们都是线程安全的,不用考虑线程同步问题。
相关文章推荐
- JSF框架中使用的设计模式介绍
- JSF框架中使用的设计模式介绍
- 在Swift中使用Cocoa的现有设计模式介绍
- 并发队列ConcurrentLinkedQueue和阻塞队列LinkedBlockingQueue使用场景总结
- 使用Micrisoft.net设计方案 第三章Web表示模式 Web模式集群详细介绍
- c#委托和事件的简单使用,简单介绍observer设计模式
- 详细介绍JSF框架技术中使用的设计模式
- 构建高性能服务(三)Java高性能缓冲设计 vs Disruptor vs LinkedBlockingQueue--转载
- 使用Micrisoft.net设计方案 第三章Web表示模式 Web模式集群详细介绍 Observer(观察器)
- 并发队列ConcurrentLinkedQueue和阻塞队列LinkedBlockingQueue使用场景总结
- 使用Micrisoft.net设计方案 第三章Web表示模式 Web模式集群详细介绍 Front Controller(前端控制器)
- WEB互动的革命 - JSF框架中使用的设计模式介绍
- 使用Micrisoft.net设计方案 第三章Web表示模式 Web模式集群详细介绍 Page Cache(页面缓存)
- 使用Micrisoft.net设计方案 第三章Web表示模式 Web模式集群详细介绍 PageController(页面控制器)
- 介绍JSF框架中使用的设计模式
- 使用Micrisoft.net设计方案 第三章Web表示模式 Web模式集群详细介绍 Page Cache(页面缓存)
- JSF框架中使用的设计模式介绍
- 使用Micrisoft.net设计方案 第三章Web表示模式 Web模式集群详细介绍 Intercepting Filter(截取筛选器)
- JSF框架中使用的设计模式介绍
- Java高性能缓冲设计 vs Disruptor vs LinkedBlockingQueue