并发容器学习—LinkedTransferQueue
一、LinkedTransferQueue并发容器
1.LinkedTransferQueue的底层实现
LinkedTransferQueue是一个底层数据结构由链表实现的无界阻塞队列,它与SynchronousQueue中公平模式的实现TransferQueue及其相似,LinkedTransferQueue中存储的也是操作而不是数据元素。可以对比着学习,更容易理解。先来看看LinkedTransferQueue结点的定义:
static final class Node { //用于标识结点的操作类型,true表示put操作,false表示take操作 final boolean isData; //结点的数据域,take类型的操作,该值为null,配对后则为put中的数据 //put类型的操作,该值为要转移的数据 volatile Object item; //当前结点的后继结点 volatile Node next; //等待的线程 volatile Thread waiter; // null until waiting //CAS方式更新后继结点 final boolean casNext(Node cmp, Node val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } //CAS方式更新数据域 final boolean casItem(Object cmp, Object val) { // assert cmp == null || cmp.getClass() != Node.class; return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } //构造方法 Node(Object item, boolean isData) { UNSAFE.putObject(this, itemOffset, item); // relaxed write this.isData = isData; } //移除出队列,方便GC final void forgetNext() { UNSAFE.putObject(this, nextOffset, this); } //取消结点,就是本次取消操作 final void forgetContents() { UNSAFE.putObject(this, itemOffset, this); UNSAFE.putObject(this, waiterOffset, null); } //判断结点的操作是否已经被匹配了,结点的操作被取消也包含在匹配当中 //也就是说这个操作若是被取消了,也认为是匹配过的 final boolean isMatched() { Object x = item; return (x == this) || ((x == null) == isData); } //判断当前结点的操作是不是未匹配过的REQUEST类型(take)的结点,true代表是 final boolean isUnmatchedRequest() { return !isData && item == null; } //判断结点的操作类型与其数据(item的值)是否相符合, //例如take操作item值应该是null,put操作item则应该是数据 final boolean cannotPrecede(boolean haveData) { boolean d = isData; Object x; return d != haveData && (x = item) != this && (x != null) == d; } //尝试匹配一个数据结点,用在移除结点的方法中 final boolean tryMatchData() { // assert isData; Object x = item; if (x != null && x != this && casItem(x, null)) { LockSupport.unpark(waiter); return true; } return false; } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long itemOffset; private static final long nextOffset; private static final long waiterOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = Node.class; itemOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("item")); nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); waiterOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("waiter")); } catch (Exception e) { throw new Error(e); } } }
2.LinkedTransferQueue的继承关系
LinkedTransferQueue的继承关系如下图所示,这么多的父类及接口中,只有一个TransferQueue接口未接触过,下面我们先来看看这个接口是干什么的。
public interface TransferQueue<E> extends BlockingQueue<E> { //尝试转移一个数据给一个正在等待消费者,如果没有等待的消费者立即返回false //转移失败的话,这个操作是不会入队等待被匹配的 boolean tryTransfer(E e); //转移一个数据给一个消费者,若没有正在等待的消费者,那么该转移操作会阻塞 //等待,或发生异常 void transfer(E e) throws InterruptedException; //在一定时间内尝试转移数据给一个消费者,如果没有正在等待的消费者,那就 //一直尝试到超时为止 boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException; //是否有消费者在等待 boolean hasWaitingConsumer(); //获取等待的消费者数量 int getWaitingConsumerCount(); }
3.LinkedTransferQueue中重要的属性及构造方法
public class LinkedTransferQueue<E> extends AbstractQueue<E> implements TransferQueue<E>, java.io.Serializable { //当前计算机CPU核心数是否大于1 private static final boolean MP = Runtime.getRuntime().availableProcessors() > 1; //结点为队列中第一个waiter时的自旋次数 private static final int FRONT_SPINS = 1 << 7; //前驱结点正在处理,当前结点需要自旋的次数 private static final int CHAINED_SPINS = FRONT_SPINS >>> 1; //队列进行清理的阈值 static final int SWEEP_THRESHOLD = 32; //队列头结点 transient volatile Node head; //队列尾结点 private transient volatile Node tail; //移除结点链接失败(修改结点的next失败)的次数,当该值大于SWEEP_THRESHOLD //时,会对队里进行一次清理,清理掉哪些无效的结点 private transient volatile int sweepVotes; //下面四个值,用于标识xfer方法的类型 /** * NOW:代表不等待消费者,直接返回结果的类型。poll方法和tryTransfer方法中使用 * ASYNC:表示异步操作,直接添加数据元素到队尾,不等待匹配,用于offer,add,put方法中 * SYNC:同步操作,等待数据元素被消费者接受,用于take,transfer方法中 * TIMED:延时操作,等待一定时间后在返回匹配的结果,用于待超时时间的poll和tryTransfer方法中 */ private static final int NOW = 0; // for untimed poll, tryTransfer private static final int ASYNC = 1; // for offer, put, add private static final int SYNC = 2; // for transfer, take private static final int TIMED = 3; // for timed poll, tryTransfer public LinkedTransferQueue() { } public LinkedTransferQueue(Collection<? extends E> c) { this(); addAll(c); } }
4.LinkedTransferQueue入队操作
LinkedTransferQueue中的入队方法包含有offer,put及add三种,这三个方法本质是都是一样的,都是调用的同一个方法且参数也都一样,并且因为LinkedTransferQueue是个无界阻塞队列,容量没有限制,因此不会出现入队等待的现象。
public void put(E e) { xfer(e, true, ASYNC, 0); } public boolean offer(E e, long timeout, TimeUnit unit) { xfer(e, true, ASYNC, 0); return true; } public boolean offer(E e) { xfer(e, true, ASYNC, 0); return true; } public boolean add(E e) { xfer(e, true, ASYNC, 0); return true; }
5.xfer方法分析
xfer方法是LinkedTransferQueue种最核心的一个方法,将其理解清楚,那么LinkedTransferQueue队列也就明白了。LinkedTransferQueue与SynchronousQueue中公平模式的实现TransferQueue是一样的,队列中存放的不是数据,而是操作(取出数据的操作take和放入数据的操作put)。队列中既可以存放take操作也可以存放put操作,但是队列中不能同时存在两种不同的操作,因为不同的操作会触发队列进行配对(操作出队)。
知道了这些我们再来看xfer方法的大致流程(超时等待部分和操作取消部分暂不分析,等分析源码时在说):当队列为空时,如果有一个线程执行take操作,此时对列中是没有对应的put操作与之匹配的,那么这个take操作就会入队,同时阻塞(也可能是自旋)执行这个操作的线程以等待匹配操作的到来;同理,空队列时来的是一个put操作,那么这个put操作也要入队阻塞等待匹配的take操作到来。而当队列不为空时(假设队列中都是take操作),某一线程执行put操作,此时队列检测到来了一个与队列中存放的操作相匹配的操作,那么就会将队首操作与到来的操作进行匹配,匹配成功,就会唤醒队首操作所在的线程,同时将已经匹配度额操作移除出队;而若是某一线程执行的是与队里中相同的操作,那么就将该操作直接添加到队尾。
//1.当e!=null 且haveData为true,how为ASYNC,nanos==0,表示没有超时设置的立即返回的放入数据的操作(put,add,offer) //2.当e==null 且haveData为false,how为SYNC,nanos==0,表示没有超时设置的等待匹配到放入数据的操作(take) //3.当e!=null 且haveData为true,how为SYNC,nanos==0,表示没有超时设置的等待匹配到取出数据的操作(transfer) //4.当e!=null 且haveData为true,how为TIMED,nanos>0,表示设置在超时等待时间内匹配取出数据的 //操作(tryTransfer(E e, long timeout, TimeUnit unit)) //5.当e==null 且haveData为false,how为TIMED,nanos>0,表示设置在超时等待时间内匹配放入数据的 //操作(poll(long timeout, TimeUnit unit)) //6.当e!=null 且haveData为true,how为NOW,nanos==0,表示立即匹配取出数据的操作(tryTransfer) //7.当e==null 且haveData为false,how为NOW,nanos==0,表示立即匹配放入数据的操作(poll) private E xfer(E e, boolean haveData, int how, long nanos) { //判断本次操作是不是放入数据的操作类型,若是则e不能为null if (haveData && (e == null)) throw new NullPointerException(); Node s = null; // the node to append, if needed retry: for (;;) { // restart on append race //从head开始匹配 for (Node h = head, p = h; p != null;) { // find & match first node boolean isData = p.isData; //获取队首结点的操作类型 Object item = p.item; //获取队首的数据 //判断队首结点是否已经被取消或匹配过了且队首结点的操作类型与其数据内容是否一致 //isData为true对应put操作,则item不能为null,反之item必须为null。 //因此,若是不一致说明p已经不是队首了,需要重新查找队首 if (item != p && (item != null) == isData) { // unmatched //此处判断本次操作应该是入队操作还是匹配操作,即判断与队首的操作类型是否一致 //若本次操作与队列中的操作类型(都是put或都是take)相同,那么需要将本次操作入队 //若是不同,那么需要将队首结点的操作与本次操作匹配 if (isData == haveData) // can't match break; //操作类型相同,退出当前循环,去执行入队步骤 //到此,说明队首操作与本次操作时相互匹配的,那么接下来需要做配对之后的工作 //尝试修改p中数据item为e,若修改成功,说明操作匹配成功 //若是修改失败,说明别其他线程抢先匹配了,那么就往队列后继续查找匹配 if (p.casItem(item, e)) { // match //本次操作已经与p匹配成功,那么p之前的结点要么是被匹配过,要么已经被取消 //都不能再做为head了,因此,这里需要将head更新 for (Node q = p; q != h;) { //获取后继结点 Node n = q.next; // update by 2 unless singleton //判断head是否还是h,若是,说明head还没被其他线程更新过,那当前线程可以尝试更新 //若是更新成功,说明h结点已经被移除出队了,那么就需要将其后继指针指向自身代表 //这个结点已被移除出队,方便GC回收 if (head == h && casHead(h, n == null ? q : n)) { h.forgetNext(); break; } // advance and retry //到这说明head已经被更新过,或是当前线程要更新head失败,那么就重新获取head并判断 //head是否为null(是否是空队列),若是则直接结束循环;若不是,再继续判断其后继结点 //是否为null(head是否是队列中最后一个结点),若是也直接结束循环(不需要再继续尝试 //更新head);若不是,在判断这个后继结点是否已经匹配过,若未匹配,那么也放弃更新head //这里可以总结出,head更新的要求:head不会随着队首被匹配就立即更新,head的更新会滞后 //只有当head及其后继都被匹配后,才会对head进行匹配;也就是说队列中要有至少两个结点匹配过 //会触发head的更新(即松弛度>2才更新head) if ((h = head) == null || (q = h.next) == null || !q.isMatched()) break; // unless slack < 2 } LockSupport.unpark(p.waiter); //p结点匹配成功,唤醒等待该结点的线程 return LinkedTransferQueue.<E>cast(item); } } //往p的后继继续查找未匹配的结点 Node n = p.next; p = (p != n) ? n : (h = head); // Use head if p offlist } //到这说明队列中的操作与本次操作相同,只能将操作入队 //判断本次操作的模式,NOw为不等待,立即返回的模式 if (how != NOW) { // No matches available //s未初始化的话,进行初始化 if (s == null) s = new Node(e, haveData); Node pred = tryAppend(s, haveData); //尝试添加到队尾,并返回其前驱 if (pred == null) //返回前驱为null,说明添加失败,重新开始 continue retry; // lost race vs opposite mode //添加的结点不为异步模式,说明是同步或超时模式,那么要等待匹配 //若为异步模式,则不需要等待匹配,因为异步模式必然是add,offer,put //三个方法,不需要等待 if (how != ASYNC) return awaitMatch(s, pred, e, (how == TIMED), nanos); } return e; // not waiting } } //结点自环 final void forgetNext() { UNSAFE.putObject(this, nextOffset, this); } //尝试添加结点到队尾 private Node tryAppend(Node s, boolean haveData) { for (Node t = tail, p = t;;) { // move p to last node and append Node n, u; // temps for reads of next & tail //判断队列是否为空,为空的话,直接更新head结点为s后结束,即空队列直接入队 if (p == null && (p = head) == null) { if (casHead(null, s)) return s; // initialize } //判断结点是否符合入队要求,即验证s结点的操作类型与p是否相同 else if (p.cannotPrecede(haveData)) return null; //不同,直接返回null //判断p是否有后继,新增结点是要添加到队尾的,而tail是可能滞后于队尾的, //且其他线程也可能抢先更新队尾,因此若p的后继不为null,说明当前p不是真正的 //队尾,需要推进查找队尾。 else if ((n = p.next) != null) p = p != t && t != (u = tail) ? (t = u) : (p != n) ? n : null; //n为null,说明找到队尾了,此时需要将p的后继更新成s,若是更新失败说明有其 //它线程抢先了,那么就重新获取队尾,再尝试 else if (!p.casNext(null, s)) p = p.next; // 继续查找队尾 //成功将s入队 else { //此时,若p不等t,说明t不是队尾,可以看看tail需不需要更新 if (p != t) { // update if slack now >= 2 /** * 判断是否需要更新tail,若是当前的tail离 * 真正的队尾不超过2个结点,那就暂时不更新tail * 若是超过的话,就更新tail while ((tail != t || !casTail(t, s)) && (t = tail) != null && (s = t.next) != null && // advance and retry (s = s.next) != null && s != t); } return p; } } } //阻塞或自旋等待匹配 private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) { //计算截止时间 final long deadline = timed ? System.nanoTime() + nanos : 0L; Thread w = Thread.currentThread(); //获取当前线程 int spins = -1; // initialized after first item and cancel checks ThreadLocalRandom randomYields = null; // bound if needed //死循环 for (;;) { Object item = s.item; //s结点的数据 //判断s是否被匹配过,未匹配时item==e,匹配过或取消后item就会改变 if (item != e) { // matched // assert item != s; //s被匹配过,那么需要将item设为s本身,且waiter要恢复成null s.forgetContents(); // avoid garbage return LinkedTransferQueue.<E>cast(item); //返回数据 } //判断s所在的线程是否被中断过或者超时时间是否到了,那么就需要取消本次s //结点对应的操作了(将s.item设为s) if ((w.isInterrupted() || (timed && nanos <= 0)) && s.casItem(e, s)) { // cancel unsplice(pred, s); //将s移除出队列 return e; } //下面都是进行的超时阻塞或者自旋操作 //判断自旋次数是否初始化过 if (spins < 0) { // establish spins at/near front //初始化自旋次数,即计算自旋次数 if ((spins = spinsFor(pred, s.isData)) > 0) randomYields = ThreadLocalRandom.current(); } else if (spins > 0) { // spin --spins; //自旋次数递减 // 生成随机数来让出CPU时间 if (randomYields.nextInt(CHAINED_SPINS) == 0) Thread.yield(); // occasionally yield } else if (s.waiter == null) { s.waiter = w; //设置等待线程 } else if (timed) { //计算超时等待时间 nanos = deadline - System.nanoTime(); if (nanos > 0L) LockSupport.parkNanos(this, nanos); //超时阻塞 } else { LockSupport.park(this); //阻塞 } } } //将s结点移除出队列,即解除和前驱结点的链接 final void unsplice(Node pred, Node s) { //将s.item设为s本身,且waiter要恢复成null s.forgetContents(); // forget unneeded fields //当s的前驱不为null,且前驱与s不相同的条件下才能进行解除链接 if (pred != null && pred != s && pred.next == s) { Node n = s.next; //获取s结点的后继 //判断s是否有后继(s是否为队尾),若s有后继那么后继是不是s本身(s是否已匹配 //或取消了),若后继不是s自身,那么就尝试将pred的后继结点更新成s的后继n,若 //是更新成功,再判断pred是否已经被匹配过或取消了 if (n == null || (n != s && pred.casNext(s, n) && pred.isMatched())) { //更新head for (;;) { // check if at, or could be, head Node h = head; //获取head //h为pred或s或队列已空,那就不需要更新了,直接返回 if (h == pred || h == s || h == null) return; // at head or list empty //若是h未被匹配过,说明不需要更新,退出当前循环 if (!h.isMatched()) break; //获取h的后继 Node hn = h.next; if (hn == null) return; // now empty //只要h未被匹配或取消,就尝试更新head if (hn != h && casHead(h, hn)) //将h结点移除出队,h.next==h h.forgetNext(); // advance head } //s节点被移除后,需要记录删除的操作次数,如果超过阀值,则需要清理队列 if (pred.next != pred && s.next != s) { // 重新检查移除是否成功 for (;;) { // sweep now if enough votes int v = sweepVotes; //返回当前删除次数的记录 //判断是否超过阈值,没超过就更新记录,超过就将记录恢复为0 //并且清理队列 if (v < SWEEP_THRESHOLD) { if (casSweepVotes(v, v + 1)) break; } else if (casSweepVotes(v, 0)) { sweep(); //清理队列 break; } } } } } } //清理队列 private void sweep() { //遍历队列 for (Node p = head, s, n; p != null && (s = p.next) != null; ) { //判断s是否被匹配过,未被匹配过就继续向后遍历 if (!s.isMatched()) // Unmatched nodes are never self-linked p = s; //s节点被匹配,但是是尾节点,则退出循环,队尾就算被匹配了也不能直接 //移除 else if ((n = s.next) == null) // trailing node is pinned break; //判断s是否已经被移除了,若是,则重新从head开始清理 else if (s == n) // stale // No need to also check for p == s, since that implies s == n p = head; else p.casNext(s, n); //移除s出队列 } }
以上就是xfer的全部过程了,一个xfer方法直接包含了LinkedTransferQueue的所有功能,不仅add,put,offer方法是由其实现的,其他的如poll,take,transfer,tryTransfer方法也均是由其实现的,只不过参数不同:
public boolean tryTransfer(E e) { return xfer(e, true, NOW, 0) == null; } public void transfer(E e) throws InterruptedException { if (xfer(e, true, SYNC, 0) != null) { Thread.interrupted(); // failure possible only due to interrupt throw new InterruptedException(); } } public boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null) return true; if (!Thread.interrupted()) return false; throw new InterruptedException(); } public E take() throws InterruptedException { E e = xfer(null, false, SYNC, 0); if (e != null) return e; Thread.interrupted(); throw new InterruptedException(); } public E poll(long timeout, TimeUnit unit) throws InterruptedException { E e = xfer(null, false, TIMED, unit.toNanos(timeout)); if (e != null || !Thread.interrupted()) return e; throw new InterruptedException(); } public E poll() { return xfer(null, false, NOW, 0); }
6.LinkedTransferQueue中主要方法流程
1.offer,add,put三个异步放入数据的操作的大致过程如下:
2.take同步取出数据的大致流程如下:
3.transfer同步放入数据的流程大致如下:
4.tryTransfer带超时设置的放入数据的流程大致如下:
5.poll带超时设置的取出数据的流程大致如下:
6.tryTransfer不做任何等待放入数据(只做一次放入数据的尝试,失败直接结束)的流程大致如下:
7.poll只进行一次取出数据的操作,失败直接返回null,大致过程如下:
7.其他方法
//查找队列中第一个item不为null或结点本身的结点,将其item返回,不移除出队列 public E peek() { return firstDataItem(); } //遍历队列查找item不为null也不指向结点自身的结点,返回其item private E firstDataItem() { //遍历队列 for (Node p = head; p != null; p = succ(p)) { Object item = p.item; if (p.isData) { if (item != null && item != p) return LinkedTransferQueue.<E>cast(item); } else if (item == null) return null; } return null; } //获取结点的后继,若后继为自身(即已被移除出队),那么返回head final Node succ(Node p) { Node next = p.next; return (p == next) ? head : next; } //返回队列中的结点数 public int size() { return countOfMode(true); } //计算结点数 private int countOfMode(boolean data) { int count = 0; //遍历队列计算有效的结点数 for (Node p = head; p != null; ) { if (!p.isMatched()) { if (p.isData != data) return 0; if (++count == Integer.MAX_VALUE) // saturated break; } Node n = p.next; if (n != p) p = n; else { count = 0; p = head; } } return count; } //删除队列查询到的第一个item为o的结点 public boolean remove(Object o) { return findAndRemove(o); } private boolean findAndRemove(Object e) { if (e != null) { //遍历队列查找删除 for (Node pred = null, p = head; p != null; ) { Object item = p.item; if (p.isData) { //判断结点是否是需要删除的数据 if (item != null && item != p && e.equals(item) && p.tryMatchData()) { unsplice(pred, p); //将结点移除出队列 return true; } } else if (item == null) //item为null,说明队列中都是取出数据的操作,不可能有e了 break; pred = p; if ((p = p.next) == pred) { // stale pred = null; p = head; } } } return false; }
- JDK容器与并发—Queue—LinkedTransferQueue
- 高并发第十三弹:J.U.C 队列 SynchronousQueue.ArrayBlockingQueue.LinkedBlockingQueue.LinkedTransferQueue
- Java并发学习(二十)-ConcurrentLinkedQueue分析
- Java并发学习笔记(八)-LinkedBlockingQueue
- JDK容器与并发—Queue—LinkedBlockingQueue
- Java并发容器:ConcurrentLinkedQueue
- JDK并发工具类源码学习系列——LinkedBlockingQueue
- Java并发容器之ConcurrentLinkedQueue
- Java 并发容器和框架--ConcurrentLinkedQueue
- 并发基础_11_并发_容器_ConcurrentLinkedQueue
- 第六章 Java并发容器和框架(ConcurrentHashMap,ConcurrentLinkedQueue,BlockingQueue,Fork Join)
- Java concurrent Framework并发容器之ConcurrentLinkedQueue(1.6)源码分析 ??
- 深入浅出 Java Concurrency (20): 并发容器 part 5 ConcurrentLinkedQueue
- 并发容器分析(四)--ConcurrentLinkedQueue
- Java并发容器——ConcurrentLinkedQueue
- 3.集合--LinkedTransferQueue学习
- LinkedTransferQueue学习导引 推荐
- Java 并发 --- 阻塞队列之LinkedTransferQueue源码分析
- Java并发容器之ConcurrentLinkedQueue源码分析
- Java并发容器之非阻塞队列ConcurrentLinkedQueue