线程池系列三:动态修改线程池队列大小
线程池中的队列要求的是阻塞队列,作用主要是当线程池处理任务能力不足时,队列存储多余的任务,从而起到削峰和缓冲的目的。
可以选择的队列种类很多,如何选择合适的队列应用到自己的线程池中?就需要了解他们的优缺点,从而择优使用
1、常见阻塞队列
常见的阻塞队列都是以基于BlockingQueue的实现
ArrayBlockingQueue 一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。
LinkedBlockingQueue 一个基于链表结构的有界阻塞队列(不设置大小时,默认为Integer.MAX_VALUE),此队列按FIFO (先进先出) 排序元素。Executors的几个静态线程池工厂方法大部分都是使用这个队列
SynchronousQueue 一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态;同理,当每个读操作的时候,同样需要一个相匹配的写操作。这里的 Synchronous 指的就是读写操作需要同步,一个读操作对应一个写操作。 注:吞吐量通常要高于LinkedBlockingQueue。静态工厂方法Executors.newCachedThreadPool使用了这个队列。
DelayQueue 是一个支持延时获取元素的无界阻塞队列。内部是基于PriorityQueue的实现。
PriorityBlockingQueue
一个具有优先级的无限阻塞队列。只能指定初始的队列大小,后面插入元素的时候,如果空间不够的话会自动扩容
注:它是无界队列,put操作不会阻塞,但take方法在队列为空的时候会阻塞队列元素不可以插入null值,同时插入队列的元素必须是可比较大小的(comparable),否则报 ClassCastException 异常
2、最常使用的两种队列
2.1、ArrayBlockingQueue 是一个基于数组结构的有界阻塞队列,底层结构是一个数组
/** The queued items */ final Object[] items; 创建队列时,在构造器中创建的数组对象(最大的容量在创建时就确定了) /** items index for next take, poll, peek or remove */ int takeIndex; 外界下次从队列获取数据时,指定从队列的takeIndex下标获取 /** items index for next put, offer, or add */ int putIndex; 外界下次向队列存入数据时,指定从队列的putIndex下标存入 /** Number of elements in the queue */ int count; 队列中实际存储数据的数量(即数组中真正有数据的元素数量) /** Main lock guarding all access */ final ReentrantLock lock; 外界存储和读取队列数据时,存在并发情况。 这里使用一把锁同时控制读写数据。因此实际上读写是串行的。 正因如此,count的类型是int而非AtomicInteger,不需要考虑线程安全 /** Condition for waiting takes */ private final Condition notEmpty; lock的Condition控制,含义是非空。 即队列无数据时,notEmpty.await()阻塞;有数据时,notEmpty.signal()。 从而控制线程间调度 /** Condition for waiting puts */ private final Condition notFull; 同样是lock的Condition控制,含义是非满, 即队列数据数量达到最大时,notFull.await()阻塞; 有数据时,notFull.signal()。从而控制线程间调度
外部存储数据时,从头开始向后遍历数组插入数据,并记录偏移量,供下次从偏移量位置再次存储;
而读取数据时也是一样,从头开始向后遍历数组读取并删除数据,记录偏移量供下次从偏移量位置再次读取
另外考虑一件问题:由于数组有界,总会读写到最后一个元素。数组前部分读取后置空,如果不再使用就浪费了。而后部分到达了尾部,队列不能再插入数据了
这就引入ArrayBlockingQueue的一个设计,即到达尾部后,若头部空置,则复用头部资源。
让线性的有界数组,在逻辑上成为环形数组,从而达到资源复用的目的。
2.2、LinkedBlockingQueue
一个基于链表结构的有界阻塞队列(不设置大小时,默认为Integer.MAX_VALUE),底层结构是一个单向链表
/** The capacity bound, or Integer.MAX_VALUE if none */ private final int capacity; 创建队列时,在构造器中指定队列最大的容量 /**Head of linked list.Invariant: head.item == null*/ transient Node<E> head; 链表的头部节点 这里的头部节点其实并没有存储真实数据,下一个节点才开始存储。 这是链表常用的结构,通过在头部增加一个空节点,从而使所有有效节点都是 链表中间节点,增删是可以统一处理。否则就要区分头部节点和中间节点, 导致区分处理了。 /**Tail of linked list.Invariant: last.next == null*/ private transient Node<E> last; 链表的尾部节点 /** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition(); 外部多线程存储数据时,存在并发场景。因此使用putLock和notFull加锁控制。 /** Lock held by take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition(); 外部多线程读取数据时,存在并发场景。因此使用takeLock和notEmpty加锁控制。 /** Current number of elements */ private final AtomicInteger count = new AtomicInteger(); 链表中实际存储数据的数量,含义对应ArrayBlockingQueue的count。 但它的类型是AtomicInteger,由于链表读写为不同的锁,存在并发场景, 因此加减数量时要考虑并发安全问题
外部存储数据时在尾部插入数据;而读取数据时则从头部读取并删除节点数据
3、ArrayBlockingQueue和LinkedBlockingQueue的选择
| | ArrayBlockingQueue | LinkedBlockingQueue |
|-|--|--|
|底层结构 |数组
(逻辑上环形数组) | 单向链表 |
| 是否有界 |有界阻塞队列
(大小必须声明) |有界阻塞队列
(不声明则默认为Integer.MAX_VALUE) |
| 公平锁 | 可配置是否使用公平锁
(默认非公平) | 仅支持非公平锁 |
|锁 |读写共用一把ReentrantLock锁
两个condition判断满空状态 | 读写分别使用一把ReentrantLock锁
使用各自的condition判断满空状态 |
|count | 共用一把锁,因此队列内无并发,类型为int | 读写两把锁,队列内存在并发,因此类型为AtomicInteger |
|其他 | | 由于是链表,因此插入的数据要多创建一个Node对象存,会对GC有影响 |
其他异同点:
- LinkedBlockingQueue底层由于是链表,因此插入数据时要多创建一个Node对象,因此会对GC有影响
- ArrayBlockingQueue从头开始遍历进行读写;而LinkedBlockingQueue则为链尾加元素,链尾取元素
- LinkedBlockingQueue读写各加一把锁,通常比ArrayBlockingQueue具有更高的吞吐量,但是在大多数并发应用程序中,可预测的性能较差。
4、自定义队列大小
无论是ArrayBlockingQueue和LinkedBlockingQueue,他们的队列大小都是不可变的。
ArrayBlockingQueue底层是数组,大小固定。
而LinkedBlockingQueue的capacity则被final修饰,不可修改。
而我们实际项目中,往往需要根据业务实际需要调整队列大小。那么如果实现队列的大小可变?
如果我们想基于ArrayBlockingQueue进行改造,但修改大小必然要涉及到重新创建数组,以及新旧数组的数据迁移问题,有些复杂。
如果考虑基于LinkedBlockingQueue进行改造,我们只要将修饰capacity的final去掉即可实现动态调整。但有一个问题,LinkedBlockingQueue具体实现中很多是基于capacity不变进行的设计,因此我们需要将涉及的功能进行调整
调整思路: 1、capacity可修改大小:去掉修饰词final,增加set方法便可动态调整 2、梳理受影响的范围:将代码中应用capacity的功能进行梳理,通过capacity的改动使其兼容正常功能
具体进行以下调整,其他内容不变:
4.1、类名修改 将LinkedBlockingQueue的代码实现拷贝并修改类名为ResizeLinkedBlockingQueue
4.2、将capacity的修饰词final去掉,增加volatile修饰词。改动后使其立即生效。并设置capacity的set方法,并在set时限制大小
4.3、梳理capacity所涉及的各个应用点,进行调整 主要是存储数据的几个方法条件判断时进行改动
------The End------
如果这个办法对您有用,或者您希望持续关注,也可以扫描下方二维码或者在微信公众号中搜索
- 修改SDL_ttf,支持动态设置字体大小
- 微信小程序系列——动态修改页面标题
- jQuery动态改变图片显示大小(修改)
- Wordpress编辑器(Tinymce)在Chrome中动态修改图片大小
- MOSS 2013研究系列---动态修改WebConfig(上)
- SpringBoot系列十一:SpringBoot整合Restful架构(使用 RestTemplate 模版实现 Rest 服务调用、Swagger 集成、动态修改日志级别)
- 微信小程序的动态修改视图层的数据 —— 微信小程序教程系列(3)
- winform中怎么动态修改窗体的大小
- 动态修改listview,spinner控件字体大小的解决办法
- 使用Qt将一系列图片通过网络发送到客户端动态显示的参考代码(修改一下可以用作远程网络监控)
- 【学习ios之路:UI系列】修改图片的尺寸大小操作
- js动态修改下拉菜单的大小
- jq操作img大小(动态修改)
- Unity3D研究院之动态修改烘培贴图的大小&脚本烘培场景
- JavaScript动态修改弹出窗口大小的方法
- 什么是线程池,如何设计一个动态大小的线程池,应该有哪些方法?
- linux 修改消息队列大小
- winform中怎么动态修改窗体的大小
- Java中动态修改数组(Array)的大小
- 简单线程系列1-固定大小的线程池