您的位置:首页 > 产品设计 > UI/UE

Java Concurrent包源码学习和使用心得 之 LinkedBlockingQueue源码解读

2014-06-11 17:00 627 查看
概述
LinkedBlockingQueue是java concurrent包提供的另一个多线程安全的阻塞队列,与ArrayBlockingQueu相比,此队列的使用链表实现(不熟悉链表的同学,请查阅大学的数据结构课本),可以提供高效的并发读写性能。

数据结构

链表节点

既然是链表,那么肯定少不了节点,节点自然包括节点内容和next指针。jdk开发人员,设计的节点是这样的:

static class Node<E> {        E item;        /**         * One of:         * - the real successor Node         * - this Node, meaning the successor is head.next         * - null, meaning there is no successor (this is the last node)         */        Node<E> next;        Node(E x) { item = x; }    }


在这里,用到了java范型的机制,用来保存不同类型的对象。

上述节点,提供了一个构造函数,用来传入需要保存的内容,这里的构造函数没有判断传入参数是否合法,因为在所有public方法中,已经判断过了,这里无需进行再次判断。

链表的指针

LInkedBlockingQueue中的链表,包含头指针和尾指针,其中:

头指针用来管理元素出队,和 take(), poll(), peek() 三个操作关联
尾指针用来管理元素入队,和 put(), offer() 两个操作关联

具体的数据结构定义如下:

private transient Node<E> head;    /* 头结点, 头节点不保存数据信息 */    private transient Node<E> last;      /* 尾节点, 尾节点保存最新入队的数据信息 */


链表的容量和大小

LinkedBlockingQueue是有大小限制的,当队列满后不能继续入队,同时,也有一个变量记录当前队列中的元素数量:

private final int capacity;    /* 队列容量一般使用中,构造LinkedBlockingQueue时,需要传入当前队列大小,如果不传入,默认是Integer.MAX_VALUE       */     private final AtomicInteger count = new AtomicInteger(0);  // 队列当前大小


注意:这里的count对象,是原子类型,而不是一般的int类型,与ArrayBlockingQueue中的不符,这是因为LinkedBlockingQueue使用读和写两把锁来控制并发操作,读和写可能同时修改count字段的值,而ArrayBlockingQueue只有一把锁用于控制读写操作,所以count对象是普通的,线程不安全的类型

控制并发的lock和condition

LinkedBlockingQueue中,读和写分别由两把锁控制,两把锁分别管理head节点和last节点的操作,如下所示:

private final ReentrantLock takeLock = new ReentrantLock();    /* 读锁 */    private final Condition notEmpty = takeLock.newCondition();     /* 读锁对应的条件 */    private final ReentrantLock putLock = new ReentrantLock();     /* 写锁 */    private final Condition notFull = putLock.newCondition();           /* 写锁对应的条件 */


jdk文档中,解释说,这两把锁的控制,是“two lock queue”算法的一种实现,但具体操作与其有些差异(A variant of the "two lock queue" algorithm.)

关于two lock queue可以参考:http://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html

关键代码解读

入队和出队的核心操作

入队和出队的核心操作,就是对于链表头结点和尾节点的操作,与我们大学学习的数据结构基本一致。因为这些操作,都是private方法,外部已经进行了正确的同步,所以这些方法中,不带任何加锁和解锁的操作。

入队的代码如下所示:

private void enqueue(E x) {        last = last.next = new Node<E>(x);    }

上述代码其实是将三行写成了一行,为了方便学习,这里把其拆开:

private void enqueue(E x) {         Node<E> newNode = new Node<E>(x);   /* 新建一个Node对象,此对象的数据部分是新元素的指针,next指针为null */         last.next = newNode  ;                              /* 将目前list节点的next指针指向新对象 */         last =last.next;                                          /* 将last指针向后移动一个元素,指向新的尾端 */    }


出队的代码如下所示:

private E dequeue() {        Node<E> h = head;         /* 记录目前头节点的指针 */        Node<E> first = h.next;   /* 得到头结点后的第一个节点,即需要出队的数据节点 */                 /* 将需要出队的数据的尾节点设置为自己,让此对象变为孤立对象,GC可以进行回收,更重要的是,如果    迭代器引用此节点,迭代器可以通过判断next是否等于自己,来了解迭代器的下一个节点是否应该重定向为头节点 */
        h.next = h;                             head = first;                   /* 将头指针指向新的头节点 */        E x = first.item;              /* 获取数据元素的内容 */        /* 将头节点所在数据元素设置为null,因为头节点的数据已出队,如果此时再持有其引用,可能造成内存泄漏 */
        first.item = null;                     return x;    }


入队的public方法

入队的方法有两种,一种是阻塞的方法,另一种是非阻塞的方法,其中:

put()算法,为阻塞算法,直到队列有空余时,才能为队列加入新元素
offer()算法为非阻塞算法,如果队列已满,立即返回或等待一会再返回,通过返回值ture或false,标记本次入队操作是否成功

put的操作算法如下所示:

public void put(E e) throws InterruptedException {        if (e == null) throw new NullPointerException();             int c = -1;                                 final ReentrantLock putLock = this.putLock;        final AtomicInteger count = this.count;        putLock.lockInterruptibly();        try {            while (count.get() == capacity) {                          notFull.await();            }            enqueue(e);            c = count.getAndIncrement();            if (c + 1 < capacity)                notFull.signal();        /* 如果完成当前入队操作后,队列依然有剩余的空间,那么再唤醒另一个等待入队的线程 */        } finally {            putLock.unlock();        }        if (c == 0)                        /* 如果入队前,队列大小为空,那么唤醒一个等待出队的线程 */            signalNotEmpty();            }

offer的算法与put类似,这里不再赘述。

出队的public方法

出队的方法与入队类似,也分为阻塞和非阻塞两种,其中:

take()算法为阻塞算法,直到队列有非空时,才将允许调用线程取出数据
poll()算法为非阻塞算法,如果队列为空,立即返回或等待一会再返回,通过返回值ture或false,标记本次出队操作是否成功
peek()算法比较特殊,只返回队列中的第一个元素,既不出队,也不阻塞,如果没有元素,就返回null

task操作的算法如下:

public E take() throws InterruptedException {        E x;        int c = -1;        final AtomicInteger count = this.count;        final ReentrantLock takeLock = this.takeLock;        takeLock.lockInterruptibly();        try {                while (count.get() == 0) {                    notEmpty.await();                }            x = dequeue();            c = count.getAndDecrement();            if (c > 1)                notEmpty.signal();          /* 如果完成当前入队操作后,队列依然有剩余的元素,那么再唤醒另一个等待出队的线程 */        } finally {            takeLock.unlock();        }        if (c == capacity)            signalNotFull();                 /* 如果出队前,队列是满的,那么出队后,队列就空了,需要通知一个等待入队的线程 */        return x;    }

其余算法,与上述算法类似,这里不再赘述。

多线程安全的迭代器

LinkedBlockingQueue的迭代器,是多线程安全的,在获取元素之前,会对上述读锁和写锁同时加锁,同时,为了防止死锁,读锁和写锁的加解锁顺序,也是经过设计的,代码如下:

void fullyLock() {        putLock.lock();       // 先加写锁        takeLock.lock();     // 再加读锁    }
    void fullyUnlock() {        takeLock.unlock();  /* 先解锁读锁 */        putLock.unlock();   /* 再解锁写锁 */    }

LinkedBlockingQueue的迭代器中,保存了以下内容:

private Node<E> current;     /* 迭代器的下一个位置 */        private Node<E> lastRet;     /* 当前迭代器的位置 */        private E currentElement;    /* 当前需要返回的元素内容 */

刚看到代码时,觉得好像只要一个指向当前位置的指针就行了,干嘛这么麻烦呢,但JDK的开发人员考虑的比我们周全多了,这三个参数,足以应付任何多线程的问题:

首先,保存了当前需要返回的内容,可以保证在当前节点移除的情况下,迭代器的next()方法,也能返回当前指向的内容,即使先调用hasNext()方法,其他线程删除了当前对象,那么next()方法也可以保证返回正确对象
其次,如果在迭代器中,调用remove()方法,删除了当前对象,那么 lastRet方法就用上了,可以通过再次遍历列表,找到需要删除的对象,并将其删除,同时为了防止remove()方法被调用两次,在删除时,会将 lastRet设置为null,如果只有这一个指针,那么remove()之后,这个迭代器就啥也干不了了
最后,current保存了迭代器的下一个指向的位置,调用hasNext()时,可以立即直到是否还有空余对象,更重要的是,如果在迭代器创建后,其他线程多次调用了出队的方法,可能导致lastRet和current都变成悬挂的指针了,这时,只要判断current的next是否为自己,就可以知道自己是否已经被出队,是否需要重定向current的位置

关于迭代器的代码精髓,就是上面的描述了,具体代码,不再赘述。

对锁的精巧使用和思考

LinkedBlockingQueue将读和写操作分离,可以让读写操作在不干扰对方的情况下,完成各自的功能,提高并发吞吐量。

在写这篇文章时,我曾经考虑过,如果使用java内置的同步机制,即 synchronized 关键字进行此类读写锁控制,但实际上实现不了,因为java对象在wait和notify时,需要对lock变量加锁,这样就失去了双锁的优势,同时会导致死锁。

防止内存泄漏

设计链表,最大的一点,就是不能出现内存泄漏。

LinkedBlockingQueue在这点上已经做的很优秀,每次移除节点,都将节点的内容字段设置为null,迭代器也是如此,确保不会发生内存泄漏。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐