您的位置:首页 > 产品设计 > UI/UE

并发容器学习—ConcurrentLinkedQueue和ConcurrentLinkedDuque

2019-05-02 10:39 561 查看
一、ConcurrentLinkedQueue并发容器 1. ConcurrentLinkedQueue的底层数据结构     ConcurrentLinkedQueue是一个底层基于链表实现的无界且线程安全的队列。遵循先进先出(FIFO)的原则 。队列的头部是队列中时间最长的元素。队列的尾部是队列中时间最短的元素。它采用CAS算法来实现同步,是个非阻塞的队列。     底层链表由一个个Node结点组成,Node的定义如下:  
private static class Node<E> {
    volatile E item;    //存放数据
    volatile Node<E> next;    //指向下个结点

    //构造方法
    Node(E item) {
        UNSAFE.putObject(this, itemOffset, item);    //unsafe操作赋值
    }

    //CAS方式尝试更新数据
    boolean casItem(E cmp, E val) {
        return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
    }

    
    void lazySetNext(Node<E> val) {
        UNSAFE.putOrderedObject(this, nextOffset, val);
    }

    //CAS方式更新下个结点地址
    boolean casNext(Node<E> cmp, Node<E> val) {
        return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }

    private static final sun.misc.Unsafe UNSAFE;
    private static final long itemOffset;    //item的内存地址偏移量
    private static final long nextOffset;    //next的内存地址偏移量

    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = Node.class;
            itemOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("item"));
            nextOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("next"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

 

2. ConcurrentLinkedQueue的继承关系     了解了底层的基本实现,再来看看 ConcurrentLinkedQueue的继承关系,如下图所示, ConcurrentLinkedQueue继承了AbstractQueue即实现了Queue接口。     之前在ArrayList及LinkedList的学习时 Queue及 AbstractCollection都已学过,不在赘言,直接来看 AbstractQueue的源码:  
public abstract class AbstractQueue<E>
    extends AbstractCollection<E>
    implements Queue<E> {

    protected AbstractQueue() {
    }

    //向队列末尾新增e元素
    public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }

    //删除队首元素,并将其返回
    public E remove() {
        E x = poll();
        if (x != null)
            return x;
        else
            throw new NoSuchElementException();
    }
    
    //获取队首元素,但不移除出队列
    public E element() {
        E x = peek();
        if (x != null)
            return x;
        else
            throw new NoSuchElementException();
    }

    //清空队列中的所有元素
    public void clear() {
        while (poll() != null)
            ;
    }

    //将集合c中的所有元素一次添加到队列末尾
    public boolean addAll(Collection<? extends E> c) {
        if (c == null)
            throw new NullPointerException();
        if (c == this)
            throw new IllegalArgumentException();
        boolean modified = false;
        for (E e : c)
            if (add(e))
                modified = true;
        return modified;
    }

}

 

3. 重要属性及构造方法     了解了 ConcurrentLinkedQueue的继承关系,再来看构造方法和一些重要的属性  
//底层链表的头结点
private transient volatile Node<E> head;

//底层链表的尾结点
private transient volatile Node<E> tail;

//空构造,创建了一个空队列
public ConcurrentLinkedQueue() {
    head = tail = new Node<E>(null);
}

//以结合c中的元素创建一个队列
public ConcurrentLinkedQueue(Collection<? extends E> c) {
    Node<E> h = null, t = null;
    for (E e : c) {
        checkNotNull(e);
        Node<E> newNode = new Node<E>(e);
        if (h == null)
            h = t = newNode;
        else {
            t.lazySetNext(newNode);
            t = newNode;
        }
    }
    if (h == null)
        h = t = new Node<E>(null);
    head = h;
    tail = t;
}

 

  4.入队的实现     队列中添加的方法有两个,分别add和offer,效果没有什么区别,接下来看看实现的过程:  
//由源码可见,add的本质还是调用了offer方法
public boolean add(E e) {
    return offer(e);
}

public boolean offer(E e) {
    //判断待添加的元素是否为null,说明ConcurrentLinkedQueue中不允许null元素
    checkNotNull(e);    
    final Node<E> newNode = new Node<E>(e);    //新建结点

    
    for (Node<E> t = tail, p = t;;) {
        Node<E> q = p.next;
        //判断p是否为尾结点
        if (q == null) {
            //CAS方式尝试更新p结点的next结点为newNode结点,失败的话继续循环尝试
            if (p.casNext(null, newNode)) {
                //p的next结点更新成功,说明队列尾结点改变了就继续尝试更新tail的值
                //这里判断p!=t,说明tail不是实际的尾结点,应该要更新了,但并不强制
                //要求一定要更新成功,即不要求tail一定要指向队列的尾结点,允许tail滞后
                //于真正的尾结点
                if (p != t) 
                    casTail(t, newNode);  //更新tail,失败也没关系
                return true;
            }
        }
        else if (p == q)

            /** 
            * p == q说明当前p结点已经被移除出队了,需要重新获取head来进行入队操作
            * 
            * 对于已经移除出队的元素,会将next置为本身,
            * 用于判断当前元素已经出队,接着从head继续遍历。 
            * 
            * 在整个offer方法的执行过程中,p一定是等于t或者在t的后面的, 
            * 因此如果p已经不在队列中的话,t也一定不在队列中了(FIFO)。 
            * 
            * 所以重新读取一次tail到快照t, 
            * 如果t未发生变化,就从head开始继续下去。 
            * 否则让p从新的t开始继续尝试入队是一个更好的选择(此时新的t很可能在head后面) 
            */
            p = (t != (t = tail)) ? t : head;
        else
            /** 
            * 若p与t相等,则让p指向next结点。 
            * 若p和t不相等,则说明已经经历多次入队失败了(可能被插队了), 
            * 则重新读取一次tail到t,如果t发生了变化(确实被插队了),则从t开始再次尝试入队。 
            */
            p = (p != t && t != (t = tail)) ? t : q;
    }
}

 

  5.出队的过程  
public E poll() {
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            E item = p.item;

            //判断item是否为null,即判断p结点是否要被移除出队
            //若item不为null,则尝试更新item为null,
            //因为item若为null表示结点标记为要被移除
            if (item != null && p.casItem(item, null)) {
                //判断p与h是否还相同
                //p与h不相同,说明head可能滞后,即head可能已经不是指向队首结点,
                //尝试更新head为p.next(p.next若为null,则说明p为队尾了,head只能更新为p)
                //p与h相同则直接返回
                if (p != h) 
                    updateHead(h, ((q = p.next) != null) ? q : p);
                return item;
            }

            //判断p是否为队尾,也就是队列是否已经空了
            //若队列已经空了,则尝试更新head为p
            else if ((q = p.next) == null) {
                updateHead(h, p);
                return null;
            }
            //p的next结点若是存在,还需要判断是否在队列中
            //若p==q,说明p已经不再队列中了,此时需要重新获取head
            //的快照h,并让p=h,尝试移除结点
            else if (p == q)
                continue restartFromHead;
            else
                p = q;    // 继续向后走一个节点尝试移除结点
        }
    }
}

final void updateHead(Node<E> h, Node<E> p) {
    if (h != p && casHead(h, p))
        h.lazySetNext(h);    //h的next结点设置为自身
}

 

  6.出入队的过程     ConcurrentLinkedQueue的出入队操作并不是使用加锁的方式实现的线程安全,而是通过无锁的CAS算法实现的,这就使得其代码实现虽然简单,但理解起来晦涩难懂。 ConcurrentLinkedQueue是不允许入队元素时null值的(结点的item不能为null),因为 ConcurrentLinkedQueue对已出队的结点会将item赋上null值。也就是说某个结点的item若为null,则说明该结点是要被删除的结,那么久可以将其重队列中移除了。 ConcurrentLinkedQueue除了对结点有以上要求外,其自身则有如下特点:     1.队列中的所有结点在任意时刻只有最后一个结点的next是为null的。     2.要求head和tail属性不能是null(可以是空结点,即item和next为null)。     3.head和tail具有滞后性,head指向的不一定是队首结点,tail指向的也不一定是队尾结点。       下面以图解的形式先演示入队的过程:     初始状态,队列中没有结点,此时head==tail,指向一个空结点。         有第一个结点要入队,通过自旋尝试入队,此时q为p.next,即为null,那么就尝试更新p.next为要新增的结点,如果p.next更新成功,入队成功,再判断p与t是否相同,即是否需要尝试更新tail(p!=t说明tail没有指向队尾),然后结束入队操作;更新p.next失败则继续尝试,直到成功为止(如上右图所示)。     再有第二个结点入队,得到如上图所示,此时q==node1不为null,且p!=q,令p=q指向下个结点重新尝试入队。        此时q==null,尝试更新p.next为要新增的结点,如果p.next更新成功,入队成功(如上右图所示);失败则继续尝试。     此时判断p!=t,说明tail的指向已经滞后了,没有指向队尾结点,可以尝试更新了,更新成不成功都没有关系,因为不成功也没事,不成功说明有其他线程已经抢先更新过了。成功则tail指向新增结点2.     再接下来,入队结点3,此时p=t=tail,q为null,与加入第一个结点过程相同,尝试更新p.next为要新增的结点,成功则结束入队操作;失败则循环继续尝试。     继续入队结点4,此时q==node3不为null,且p!=q,令p=q重新尝试入队。     此时q==null,尝试更新p.next为要新增的结点,这里假定更新失败,即有其他线程抢先入队了结点x,且tail也被更新。     此时p与t不相同,且t与tail也不相同,即tail已经改变,此时结点4要入队只能在新的tail之后去尝试入队,因此直接令p=tail去继续尝试入队。     到此在重复前面的入队步骤,q==null,尝试更新p.next为node4.成功则结束。       出队的过程,以上面入完5个结点开始出队过程的分析:     结点1开始出队,此时p=h=head,p.item==null,q=p.next;则可知head结点现在是滞后状态,指向的并不是队首结点,需要查找队首结点,令p=q。     这时p.item!=null,尝试将p.item的值更新为null,因为head之后第一个item不为null的结点即是队首结点,也就是要移除出队的结点,而要被移除的结点的item要被标记成null值,标记成功说明该结点可以删除出队了;若尝试更新失败,说明被其他线程抢先出队,那么就重复上一步继续查找新队首,再尝试出队操作。     若p.item更新成功则判断此时p与h是否相同,若是相同则直接返回item;若是不相同,说明head此时已经滞后了,那么可以尝试更新head(head若是更新成功则h结点的next指向h自身,说明该结点已不再队列中)。       到此,第一个结点的移除就结束了。     此时,再继续移除队首结点2,如上图所示,有p=h=head,p.item为null,q=p.next且不为null(有后继结点),令p=q往后继续查找队首。     此时p.item=2,不为null,说明找到队首,可以尝试更新结点2的item值,假定此时更新失败,则说明结点2被其他线程抢先移除出队了,那么此时需要继续查找队列中第一个item不为null的结点来出队。     到此则有p指向node3,此时p.item依旧不为null,则可以执行更新结点3的item,若是更新成功,且head更新失败,则可得到如下图所示结果。     若是继续移除结点x,那么就需要重head开始,遍历到结点x出才可能执行移除出队操作,我们假定在遍历时(在p=h=head之后,结点x正好被移除),有其它线程抢先移除了结点x,并且更新了head的位置,且原本的h的nexr指向h自身。     此时有p.item==null,且p==q且不为null,那么需要重新获取p=h=head,得到如下图所示结果。     到此,又回到移除队首的初始状态,此时p.item==null,令p=q获取下个结点。     此时,p.item不为null,那么久要尝试更新p.item,假设更新失败,那么此时q=p.next为null,即node4为队尾结点且已经被其他线程抢先移除出队了,那么能做的只剩尝试更新head结点,并且返回null了(队列中没有结点可以移除了,只能返回null)。     从这里还可以看出在 ConcurrentLinkedQueue中head是可以在tail的后面的,这是由于head和tail的滞后性带来的影响。   7.其他的方法     
//peek的原理与poll差不多,只是peek中获取到队首后,不去进行CAS的更新item操作
//只将item值返回即可
public E peek() {
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            E item = p.item;

            //判断是不是队首结点,即通过item是否为null,判断当前结点
            //是否已被移除出队
            //判断p.next是否为null,则是为判断队列是否是空队列
            //若是空队列也可结束了
            if (item != null || (q = p.next) == null) {
                updateHead(h, p);    //尝试更新滞后的head
                return item;
            }
            //判断结点是否已经被移除出队,是的话要重新获取head来查找队首
            else if (p == q)
                continue restartFromHead;
            else
                p = q;
        }
    }
}

//统计队列中的元素个数,瞬时值,不能太过依赖
public int size() {
    int count = 0;
    //获取队首结点,然后遍历队列挨个统计
    for (Node<E> p = first(); p != null; p = succ(p))
        //判断结点是不是要被移除,或已被移除(item为null,说明是被遗弃的结点,不需要统计)
        if (p.item != null)    
            // Collection.size() spec says to max out
            if (++count == Integer.MAX_VALUE)
                break;
    return count;
}

//获取队首元素,与peek基本一致,只不过返回的是结点,而peek返回的是item
Node<E> first() {
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            boolean hasItem = (p.item != null);
            if (hasItem || (q = p.next) == null) {
                updateHead(h, p);
                return hasItem ? p : null;
            }
            else if (p == q)
                continue restartFromHead;
            else
                p = q;
        }
    }
}

//获取后继结点,若是后继结点是自身(已被移除),那么返回head
final Node<E> succ(Node<E> p) {
    Node<E> next = p.next;
    return (p == next) ? head : next;
}

二、ConcurrentLinkedDueue并发容器

1.ConcurrentLinkedDueue

    ConcurrentLinkedDueue的底层数据实现与ConcurrentLinkedQueue类似,都是链表,不同的是ConcurrentLinkedDueue是双向链表,因此ConcurrentLinkedDueue既可以当做队列也可当做栈来使用。并且ConcurrentLinkedDueue实现线程安全的,非阻塞的方式与ConcurrentLinkedQueue一样都是采用CAS算法。若ConcurrentLinkedDueue当做队列使用那么与ConcurrentLinkedQueue没有区别,效率也相同,源码也十分类似,这里就不做过多分析。

    

 

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  CAS