blockingQueue实现消费-生产模式
2017-03-19 16:55
337 查看
blockingQueue实为阻塞队列,数据由消费者线程、生产者线程共享。消费者产生数据放入阻塞队列,倘若阻塞队列已满,则生产者线程被阻塞直到消费者线程取出数据或者是直接返回失败,若队列数据为空,则消费者线程阻塞直到生产者线程放入数据或者直接返回失败。且阻塞队列的数据为先进先出的规则。
常用api
blockingQueue常用实现类
阻塞队列使用场景
使用ArrayBlockingQueue实现消费-生产情景
package TestExample;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.junit.Test;
public class TestExample{
}
运行结果:
常用api
offer(E e) 向阻塞队列放入数据,倘若阻塞队列已满,则直接返回false。 offer(E e, long timeout, TimeUnit unit) 向阻塞队列放入数据,倘若timeout时间过后数据还未放入队列,则直接返回false。 put(E e) 向阻塞队列放入数据,倘若阻塞队列已满,则生产线程阻塞直到消费线程取出数据。 poll() 向阻塞队列取出数据,倘若阻塞队列没有数据,则直接返回false。 poll(long timeout, TimeUnit unit) 向阻塞队列取出数据,倘若timeout时间过后依然没有取出数据,则直接返回false。 take() 向阻塞队列放入数据,倘若阻塞队列已满,则消费线程阻塞直到生产线程放入数据。 drainTo():一次性从阻塞队列所有可用的数据对象。
blockingQueue常用实现类
ArrayBlockingQueue 采用数组来放入存储数据。在初始化的时候需要指定数组长度。这就限定了ArrayBlockingQueue 只能存储指定长度的数据。并且ArrayBlockingQueue 的放入与取出不能并发进行,因为ArrayBlockingQueue 采用唯一的锁来维护存放队列的数组。适合生产线程产生的数据快于消费线程消费数据的场景。 LinkedBlockingQueue 采用链表来维护数据队列。因为链表增加与取出数据对象是作用在不同对象上,所以取出与增加是可以并发进行的。但是LinkedBlockingQueue 初始化的时候可以不限定长 4000 度,这就导致了LinkedBlockingQueue 可以存储接近于无穷的数据,这可能会导致内存溢出。所有LinkedBlockingQueue 适合生产线程产生的数据慢于消费线程消费数据的场景。
阻塞队列使用场景
列如用户上传了一个数据量很大的文件,后台解析需要耗费十几秒,这时总不能让用户等待十几秒吧,这时候只需将文件数据放入阻塞队列,然后放回给用户成功,接着生产线程去处理这些数据。 例如12306开票的时候大量用户涌进来买票,这时候由于高并发量太大,导致很多业务逻辑需要同时处理,数据库io操作过于密集频繁,这时候可以把用户的请求数据放入消息队列进行排队,相关接口去取数据用于业务操作。 例如12306订票成功后需要发送邮件通知给用户,由于12306的系统能应对很强大的高并发场景,但是邮件系统不能处理大量高并发。由于发送邮件可以延迟,所有只需要把发送邮件的内容放入阻塞队列,发邮件线程去取数据发送即可。
使用ArrayBlockingQueue实现消费-生产情景
package TestExample;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.junit.Test;
public class TestExample{
ArrayBlockingQueue<String> test = new ArrayBlockingQueue<String>(10); CountDownLatch latch = new CountDownLatch(2); @Test public void test(){ ExecutorService service = Executors.newFixedThreadPool(2); service.execute(new ProductTread(test)); service.execute(new ConsumeTread(test)); try { latch.await(); System.out.println("主线程结束!"); } catch (InterruptedException e) { e.printStackTrace(); } } class ProductTread implements Runnable{ BlockingQueue<String> bq; public void run() { for(int i=0;i<20;i++){ try { bq.put(String.valueOf(i)); System.out.println("生产者放入数据:"+String.valueOf(i)); } catch (InterruptedException e) { e.printStackTrace(); } } latch.countDown(); } ProductTread(BlockingQueue<String> bq){ this.bq=bq; } } class ConsumeTread implements Runnable{ BlockingQueue<String> bq; public void run() { for(int i=0;i<20;i++){ try { Thread.sleep(1000); System.out.println("消费者取出数据"+bq.take()); } catch (InterruptedException e) { e.printStackTrace(); } } latch.countDown(); } ConsumeTread(BlockingQueue<String> bq){ this.bq=bq; } }
}
运行结果:
生产者放入数据:0 生产者放入数据:1 生产者放入数据:2 生产者放入数据:3 生产者放入数据:4 生产者放入数据:5 生产者放入数据:6 生产者放入数据:7 生产者放入数据:8 生产者放入数据:9 消费者取出数据0 生产者放入数据:10 消费者取出数据1 生产者放入数据:11 消费者取出数据2 生产者放入数据:12 消费者取出数据3 生产者放入数据:13 消费者取出数据4 生产者放入数据:14 消费者取出数据5 生产者放入数据:15 消费者取出数据6 生产者放入数据:16 消费者取出数据7 生产者放入数据:17 消费者取出数据8 生产者放入数据:18 消费者取出数据9 生产者放入数据:19 消费者取出数据10 消费者取出数据11 消费者取出数据12 消费者取出数据13 消费者取出数据14 消费者取出数据15 消费者取出数据16 消费者取出数据17 消费者取出数据18 消费者取出数据19 主线程结束!
相关文章推荐
- 生产消费模式的几种实现
- 多线程初探之使用Lock实现生产-消费模式
- 关于java多线程浅析一:简单实现生产消费模式
- GCD 实现生产-消费 模式
- 使用Lock锁机制实现 多线程的 多生产 多消费 模式。
- 使用C#的泛型队列Queue实现生产消费模式
- 通过pthread_mutex_lock和pthread_cond_wait实现生产消费模式,并且生产一次消费一次
- 整理修改的一个日志类,用生产与消费模式实现,消费模式用了异步执行
- redis队列-生产消费模式-简单实现
- 生产/消费模式实现
- 使用Condition实现多线程之间调用(生产消费模式)
- java使用BlockingQueue实现生产消费者模式
- 用java实现生产和消费
- 初次尝试ActiveMQ,实现简单的消息生产和消息消费
- 生产消费模式
- 机房重构时利用状态模式实现消费时间的计算
- 机房重构时利用状态模式实现消费时间的计算
- Java多线程实例->生产消费模式
- 生产/消费模型的java实现
- java使用LinkedBlockingQueue实现 生产者 消费者模式