您的位置:首页 > 其它

线程池系列三:动态修改线程池队列大小

2021-10-25 22:40 169 查看

线程池中的队列要求的是阻塞队列,作用主要是当线程池处理任务能力不足时,队列存储多余的任务,从而起到削峰和缓冲的目的。

可以选择的队列种类很多,如何选择合适的队列应用到自己的线程池中?就需要了解他们的优缺点,从而择优使用

1、常见阻塞队列

常见的阻塞队列都是以基于BlockingQueue的实现

ArrayBlockingQueue 一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。

LinkedBlockingQueue 一个基于链表结构的有界阻塞队列(不设置大小时,默认为Integer.MAX_VALUE),此队列按FIFO (先进先出) 排序元素。Executors的几个静态线程池工厂方法大部分都是使用这个队列

SynchronousQueue 一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态;同理,当每个读操作的时候,同样需要一个相匹配的写操作。这里的 Synchronous 指的就是读写操作需要同步,一个读操作对应一个写操作。 注:吞吐量通常要高于LinkedBlockingQueue。静态工厂方法Executors.newCachedThreadPool使用了这个队列。

DelayQueue 是一个支持延时获取元素的无界阻塞队列。内部是基于PriorityQueue的实现。

PriorityBlockingQueue 一个具有优先级的无限阻塞队列。只能指定初始的队列大小,后面插入元素的时候,如果空间不够的话会自动扩容 注:它是无界队列,put操作不会阻塞,但take方法在队列为空的时候会阻塞队列元素不可以插入null值,同时插入队列的元素必须是可比较大小的(comparable),否则报 ClassCastException 异常

2、最常使用的两种队列

2.1、ArrayBlockingQueue 是一个基于数组结构的有界阻塞队列,底层结构是一个数组

/** The queued items */
final Object[] items;
创建队列时,在构造器中创建的数组对象(最大的容量在创建时就确定了)

/** items index for next take, poll, peek or remove */
int takeIndex;
外界下次从队列获取数据时,指定从队列的takeIndex下标获取

/** items index for next put, offer, or add */
int putIndex;
外界下次向队列存入数据时,指定从队列的putIndex下标存入

/** Number of elements in the queue */
int count;
队列中实际存储数据的数量(即数组中真正有数据的元素数量)

/** Main lock guarding all access */
final ReentrantLock lock;
外界存储和读取队列数据时,存在并发情况。
这里使用一把锁同时控制读写数据。因此实际上读写是串行的。
正因如此,count的类型是int而非AtomicInteger,不需要考虑线程安全

/** Condition for waiting takes */
private final Condition notEmpty;
lock的Condition控制,含义是非空。
即队列无数据时,notEmpty.await()阻塞;有数据时,notEmpty.signal()。
从而控制线程间调度

/** Condition for waiting puts */
private final Condition notFull;
同样是lock的Condition控制,含义是非满,
即队列数据数量达到最大时,notFull.await()阻塞;
有数据时,notFull.signal()。从而控制线程间调度

外部存储数据时,从头开始向后遍历数组插入数据,并记录偏移量,供下次从偏移量位置再次存储;

而读取数据时也是一样,从头开始向后遍历数组读取并删除数据,记录偏移量供下次从偏移量位置再次读取

另外考虑一件问题:由于数组有界,总会读写到最后一个元素。数组前部分读取后置空,如果不再使用就浪费了。而后部分到达了尾部,队列不能再插入数据了

这就引入ArrayBlockingQueue的一个设计,即到达尾部后,若头部空置,则复用头部资源。

让线性的有界数组,在逻辑上成为环形数组,从而达到资源复用的目的。

2.2、LinkedBlockingQueue

一个基于链表结构的有界阻塞队列(不设置大小时,默认为Integer.MAX_VALUE),底层结构是一个单向链表

/** The capacity bound, or Integer.MAX_VALUE if none */
private final int capacity;
创建队列时,在构造器中指定队列最大的容量

/**Head of linked list.Invariant: head.item == null*/
transient Node<E> head;
链表的头部节点
这里的头部节点其实并没有存储真实数据,下一个节点才开始存储。
这是链表常用的结构,通过在头部增加一个空节点,从而使所有有效节点都是
链表中间节点,增删是可以统一处理。否则就要区分头部节点和中间节点,
导致区分处理了。

/**Tail of linked list.Invariant: last.next == null*/
private transient Node<E> last;
链表的尾部节点

/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
外部多线程存储数据时,存在并发场景。因此使用putLock和notFull加锁控制。

/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
外部多线程读取数据时,存在并发场景。因此使用takeLock和notEmpty加锁控制。

/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();
链表中实际存储数据的数量,含义对应ArrayBlockingQueue的count。
但它的类型是AtomicInteger,由于链表读写为不同的锁,存在并发场景,
因此加减数量时要考虑并发安全问题

外部存储数据时在尾部插入数据;而读取数据时则从头部读取并删除节点数据

3、ArrayBlockingQueue和LinkedBlockingQueue的选择

| | ArrayBlockingQueue | LinkedBlockingQueue | |-|--|--| |底层结构 |数组
(逻辑上环形数组) | 单向链表 | | 是否有界 |有界阻塞队列
(大小必须声明) |有界阻塞队列
(不声明则默认为Integer.MAX_VALUE) | | 公平锁 | 可配置是否使用公平锁
(默认非公平) | 仅支持非公平锁 | |锁 |读写共用一把ReentrantLock锁
两个condition判断满空状态 | 读写分别使用一把ReentrantLock锁
使用各自的condition判断满空状态 | |count | 共用一把锁,因此队列内无并发,类型为int | 读写两把锁,队列内存在并发,因此类型为AtomicInteger | |其他 | | 由于是链表,因此插入的数据要多创建一个Node对象存,会对GC有影响 |

其他异同点:

  • LinkedBlockingQueue底层由于是链表,因此插入数据时要多创建一个Node对象,因此会对GC有影响
  • ArrayBlockingQueue从头开始遍历进行读写;而LinkedBlockingQueue则为链尾加元素,链尾取元素
  • LinkedBlockingQueue读写各加一把锁,通常比ArrayBlockingQueue具有更高的吞吐量,但是在大多数并发应用程序中,可预测的性能较差。

4、自定义队列大小

无论是ArrayBlockingQueue和LinkedBlockingQueue,他们的队列大小都是不可变的。

ArrayBlockingQueue底层是数组,大小固定。

而LinkedBlockingQueue的capacity则被final修饰,不可修改。

而我们实际项目中,往往需要根据业务实际需要调整队列大小。那么如果实现队列的大小可变?

如果我们想基于ArrayBlockingQueue进行改造,但修改大小必然要涉及到重新创建数组,以及新旧数组的数据迁移问题,有些复杂。

如果考虑基于LinkedBlockingQueue进行改造,我们只要将修饰capacity的final去掉即可实现动态调整。但有一个问题,LinkedBlockingQueue具体实现中很多是基于capacity不变进行的设计,因此我们需要将涉及的功能进行调整

调整思路: 1、capacity可修改大小:去掉修饰词final,增加set方法便可动态调整 2、梳理受影响的范围:将代码中应用capacity的功能进行梳理,通过capacity的改动使其兼容正常功能

具体进行以下调整,其他内容不变:

4.1、类名修改 将LinkedBlockingQueue的代码实现拷贝并修改类名为ResizeLinkedBlockingQueue

4.2、将capacity的修饰词final去掉,增加volatile修饰词。改动后使其立即生效。并设置capacity的set方法,并在set时限制大小

4.3、梳理capacity所涉及的各个应用点,进行调整 主要是存储数据的几个方法条件判断时进行改动

------The End------



如果这个办法对您有用,或者您希望持续关注,也可以扫描下方二维码或者在微信公众号中搜索

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: