Java Thread&Concurrency(5): 深入理解Phaser实现原理
2014-06-12 16:10
881 查看
背景(来自注释):
Phaser作为一个可复用的同步屏障来使用,在功能上相似于CyclicBarrier和CountDownLatch,但是支持更灵活的使用场景。
Registration.不像其他的屏障,在一个Phaser上注册的同步对象个数是可变的。任务能够在任意时刻被注册(使用类似于register方法/bulkRegister方法,或者在构造的一开始注册),以及能够在到达之后可选择地撤销(使用arriveAndDeregister方法)。像大多数基本的同步工具一样,注册和撤销只会影响内部的计数,他们不会在内部保留对注册对象的跟踪,所以任务不能通过查询得知自身是否被注册。(然而,你能够引入类似的跟踪策略通过扩展这个类)。
Synchronization.类似于CyclicBarrier,一个Phaser能够重复用于等待。方法arriveAndAwaitAdvance有类似于CyclicBarrier.await的效果。phaser的每一代都伴随有一个阶段数。这个阶段数开始为0,当所有注册对象到达之后会递增,重新调整为0当达到Integer.MAX_VALUE时。阶段(phase)的使用可以让独立的几个行动抵达在phaser上等待其他的行动,通过两种类型的方法能够让已注册对象达到这一点:
Arrival.arrive和arriveAndDeregister记录到达。这些方法不会阻塞,但是返回一个伴随的阶段数;也就是这一次到达所使用的阶段数。当最后一个注册对象抵达之后,一个可选的行动将会被执行以及阶段数会递增。被注册对象执行的方法会触发阶段数的递增,已经通过覆盖方法onAdvance,能够控制Phaser的结束。覆盖这个方法类似于通过提供一个屏障行动给CyclicBarrier,但是更灵活。
Waiting.方法awaitAdvance需要一个参数指示一个抵达阶段数,以及返回当阶段数抵达一个不一样的阶段数。不同于CyclicBarrier的是,awaitAdvance会在中断之后继续等待。中断和超时的版本同时提供,但是中断和超时不会改变phaser的状态。如果需要,你可以执行这些异常的伴随处理器,通过调用forceTermination。Phaser也可以被在ForkJoinPool中执行的任务使用,这样可以确保充足的并行性,当其他任务在一个阶段上阻塞时。
Termination.一个phaser能够进入termination状态,这个状态可以通过isTerminated来检查。在结束之后,所有同步方法立刻返回不需要再等待阶段改变,返回一个负数。类似的,尝试在结束之后注册对象不会有任何影响。结束会在一个调用onAdvance返回true之后被触发。默认的实现返回true说明撤销操作已经使得注册者个数为0。正如在下面所说,可以通过覆盖这个方法从而让阶段数抵达一个阈值时结束phaser,从而实现固定的迭代操作。方法forceTermination也可以用作突然释放等待线程以使他们结束。
Tiering. Phasers能够被分层(通过构造树结构)从而降低竞争。一个拥有大量参与者的Phaser能够通过构造拥有同一个公共父Phaser的结构来降低严重的同步竞争开销。这样能够极大地增加吞吐量,即使它带来每个操作更大的开销。
在一个分层phasers的树结构中,注册和撤销孩子phasers以及它们的父phaser是被自动管理的。当一个子phaser的注册数变为非0时(通过Phaser(Phaser,int)、constructor、register、bulkRegister),子phaser被父phaser注册。当注册数变为0,子phaser会从父phaser中撤销。
Monitoring.同步方法只能够被已注册的对象调用,phaser的当前状态能够被每一个调用者监视。在任何给定的时刻拥有{getRegisteredParties}个参与者,以及{getArriveParties}个对象已经抵达当前的阶段{getPhase}。当剩余的{getUnarrivedParties}个对象到达时,这个阶段数递增。但是这些方法返回的只是瞬时数据,所以在同步控制中不是非常有用。方法{toString}返回当前监视的这些状态的一个快照。
使用场景:
Phaser可以代替CountDownLatch来构建一个服务于多个对象只执行一次的行动。
这个典型的应用惯例是注册-->执行任务-->撤销,例如:
可以通过覆盖onAdvance来实现让多个线程把某些任务执行固定的次数。
如果主任务需要等待phaser的结束,那么它可以不断地注册自己从而执行一个类似循环:
你可以等待特定的不超过Integer.MAX_VALUE的阶段数。例如:
创建一个集合的任务可以使用一个树结构的phasers,你能够使用以下的代码,假如一个Task类有一个构造函数接受一个Phaser从而进行注册。在调用build(new Task
, 0, n, new Phaser())后,这些任务能够被执行,
比如提交给一个线程池:
注意:我们的这个实现将参与者的上限定为65535.尝试注册更多的参与者会导致IllegalStateException异常。当然,你可以通过使用分层Phaser的方法去满足任意数量的参与者。
算法:
当前类(Phaser)实现了一个X10"clocks“的扩展。
主要的64位状态变量(state),被划分为4个域:
unarrived--没有抵达屏障的参与者个数(bits 0-15)
parties --需要等待的参与者个数 (bits 16-37)
phase --屏障的阶段数 (bits 32-62)
terminated --1为结束 (bits 63/sign)
为了能够高效地维护状态的原子性,这些值被打包放进单个64位原子变量。通过简单的方式编码/解码状态变量以及竞争过程短暂,来获得高性能。
所有状态都是通过CAS操作,除了一开始的对于子phaser的注册(拥有一个非null的父phaser),在这种少见的场景中,我们在第一次注册时使用内置同步方式来加锁。
子phaser的阶段数允许滞后于它的祖先知道它能够被使用--查看方法reconcileState。
实现:
详情如下:
首先判断将要注册的个数(parties>>>PARTIES_SHIFT!=0),如果成立那么说明参与方太多,直接抛出异常。
设置当前phaser的父phaser以及阶段数phase为0,假如父phaser不为null,那么取得父phaser的根root,设置当前root以及两个队列evenQ和oddQ,当注册数不为0时,在父phaser中注册当前phaser。
否则只需要设置自身为root以及初始化evenQ和oddQ。
设置状态变量state:0-15位作为unarrived个数(若parties为0则用1),16-31位作为parties的值,32-62位作为phase(阶段数)的值,最高位63作为phaser是否结束的标志(1为结束)。
我们接着再来看register(注册)操作:
这个方法的作用是为phaser增加一个新的参与方,假如此时unarrived为0,并且onAdvance方法正在被调用,这个方法会阻塞直到phaser的那部分工作完成。如果这个phaser有一个父phaser,并且这个子phaser没有注册过对象,那么这个子phaser也会在它的父phaser上面注册。如果当前phaser已经结束,那么尝试去注册不会有任何影响并且一个负数会返回。
这个方法返回当前这个注册完成时的阶段数。如果注册过多参与者会抛出IllegalStateException异常。
详情如下:
调用doRegister(1),1为注册个数。
doRegister中,首先获得需要递增的值adjust,获得父phaser,然后进入循环。
首先调整state(状态值)(reconcileState),因为阶段数要以root中的为准,所以reconcileState方法会取得root中阶段数组合当前的paties和unarrived值成为新的state状态。
之后假如phase小于0,既phaser已经结束,返回负数。
假如需要注册个数过大,爬出异常。
假如当前counts不为空(parties或者unarrived不为空),那么当parent不为空或者状态数state稳定(阶段数不变)时,假如发现unarrived为0则等待(被状态数改变之后唤醒然后重试),否则尝试调整当前的state状态值。
假如counts为空并且parent为空,则尝试直接改变当前的状态值state(即为调整parties和unarrived值)。
假如counts为空并且parent不为空,那么说明不仅需要改变当前的state,还需要去父phaser中注册,这里使用内置同步块的方式,当phase小于0时返回,在成功调整state之后跳出返回阶段值。
我们接着再来看arriveAndAwaitAdvance(抵达并等待)操作:
这个方法在参与者注册之后被调用的,作为是声明自己抵达改phaser并等待其他参与者抵达。等价于awaitAdvance(arrive()),如果你想要使用可中断或者超时的方式来等待,那么你可以使用相似的方法通过另外的形式来调用awaitAdvance来达到目的。如果你还需要再抵达之后撤销,那么可以使用awaitAdvance(arriveAndDeregister())。这个方法不能被没有注册的参与方调用,否则可能抛出IllegalStateException异常。
详情如下:
首先取得跟phaser,进入循环。
可能调整并取得最新的状态数state(可能会更新phase)。
取得阶段数(phase),小于0则返回(结束)。
取得counts(该值为当前parties和unarrived的或操作,若为EMPTY说明当前phaser上无任何参与方)。
取得unarrived值,小于或等于0抛出IllegalStateException异常(说明有未注册的一方调用了此方法)。
试着CAS改变state的值(仅unarrived的部分),失败重试。
成功之后,根据unarrived判断,假如>1说明不是最后一个抵达的参与方,所以调用internalAwaitAdvance方法在root上等待。
否则,说明当前是改phaser的最后一个抵达的参与方,那么假如root!=this(当前phaser不是根),就调用parent(父phaser)的arriveAndAwaitAdvance方法(同名方法)。
否则当前phaser就为根,那么保留注册数n(s & PARTIES_MASK ),根据可扩展的onAdvance方法、以及当前注册数以及递增之后的阶段数(phase)来CAS构造新的状态值(state)。这一步若失败说明有外部的终止调用,直接返回后32位(最高位1)即可。
成功之后返回新的状态值。
doArrive主要被arrive和arriveAndDeregister来调用,与arriveAndAwaitAdvance的代码非常类似(所以不给出详情),主要的区别在于:
doArrive中可能会有撤销操作,而arriveAndAwaitAdvacne中只是抵达操作,所以当nextUnarrived(PARTIES)的值为0时,需要对父phaser也发起传递的撤销操作。
arriveAndAwaitAdvance中只需要调用父phaser的同名方法,但是doArrive中会根据nextUnarrived的值来区分调用父phaser的同名方法时的参数
存在的问题:
Phaser作为一个可复用的同步屏障来使用,在功能上相似于CyclicBarrier和CountDownLatch,但是支持更灵活的使用场景。
Registration.不像其他的屏障,在一个Phaser上注册的同步对象个数是可变的。任务能够在任意时刻被注册(使用类似于register方法/bulkRegister方法,或者在构造的一开始注册),以及能够在到达之后可选择地撤销(使用arriveAndDeregister方法)。像大多数基本的同步工具一样,注册和撤销只会影响内部的计数,他们不会在内部保留对注册对象的跟踪,所以任务不能通过查询得知自身是否被注册。(然而,你能够引入类似的跟踪策略通过扩展这个类)。
Synchronization.类似于CyclicBarrier,一个Phaser能够重复用于等待。方法arriveAndAwaitAdvance有类似于CyclicBarrier.await的效果。phaser的每一代都伴随有一个阶段数。这个阶段数开始为0,当所有注册对象到达之后会递增,重新调整为0当达到Integer.MAX_VALUE时。阶段(phase)的使用可以让独立的几个行动抵达在phaser上等待其他的行动,通过两种类型的方法能够让已注册对象达到这一点:
Arrival.arrive和arriveAndDeregister记录到达。这些方法不会阻塞,但是返回一个伴随的阶段数;也就是这一次到达所使用的阶段数。当最后一个注册对象抵达之后,一个可选的行动将会被执行以及阶段数会递增。被注册对象执行的方法会触发阶段数的递增,已经通过覆盖方法onAdvance,能够控制Phaser的结束。覆盖这个方法类似于通过提供一个屏障行动给CyclicBarrier,但是更灵活。
Waiting.方法awaitAdvance需要一个参数指示一个抵达阶段数,以及返回当阶段数抵达一个不一样的阶段数。不同于CyclicBarrier的是,awaitAdvance会在中断之后继续等待。中断和超时的版本同时提供,但是中断和超时不会改变phaser的状态。如果需要,你可以执行这些异常的伴随处理器,通过调用forceTermination。Phaser也可以被在ForkJoinPool中执行的任务使用,这样可以确保充足的并行性,当其他任务在一个阶段上阻塞时。
Termination.一个phaser能够进入termination状态,这个状态可以通过isTerminated来检查。在结束之后,所有同步方法立刻返回不需要再等待阶段改变,返回一个负数。类似的,尝试在结束之后注册对象不会有任何影响。结束会在一个调用onAdvance返回true之后被触发。默认的实现返回true说明撤销操作已经使得注册者个数为0。正如在下面所说,可以通过覆盖这个方法从而让阶段数抵达一个阈值时结束phaser,从而实现固定的迭代操作。方法forceTermination也可以用作突然释放等待线程以使他们结束。
Tiering. Phasers能够被分层(通过构造树结构)从而降低竞争。一个拥有大量参与者的Phaser能够通过构造拥有同一个公共父Phaser的结构来降低严重的同步竞争开销。这样能够极大地增加吞吐量,即使它带来每个操作更大的开销。
在一个分层phasers的树结构中,注册和撤销孩子phasers以及它们的父phaser是被自动管理的。当一个子phaser的注册数变为非0时(通过Phaser(Phaser,int)、constructor、register、bulkRegister),子phaser被父phaser注册。当注册数变为0,子phaser会从父phaser中撤销。
Monitoring.同步方法只能够被已注册的对象调用,phaser的当前状态能够被每一个调用者监视。在任何给定的时刻拥有{getRegisteredParties}个参与者,以及{getArriveParties}个对象已经抵达当前的阶段{getPhase}。当剩余的{getUnarrivedParties}个对象到达时,这个阶段数递增。但是这些方法返回的只是瞬时数据,所以在同步控制中不是非常有用。方法{toString}返回当前监视的这些状态的一个快照。
使用场景:
Phaser可以代替CountDownLatch来构建一个服务于多个对象只执行一次的行动。
这个典型的应用惯例是注册-->执行任务-->撤销,例如:
void runTasks(List<Runnable> tasks) { final Phaser phaser = new Phaser(1); // "1" to register self // create and start threads for (final Runnable task : tasks) { phaser.register(); new Thread() { public void run() { phaser.arriveAndAwaitAdvance(); // await all creation task.run(); } }.start(); } // allow threads to start and deregister self phaser.arriveAndDeregister(); }
可以通过覆盖onAdvance来实现让多个线程把某些任务执行固定的次数。
void startTasks(List<Runnable> tasks, final int iterations) { final Phaser phaser = new Phaser() { protected boolean onAdvance(int phase, int registeredParties) { return phase >= iterations || registeredParties == 0; } }; phaser.register(); for (final Runnable task : tasks) { phaser.register(); new Thread() { public void run() { do { task.run(); phaser.arriveAndAwaitAdvance(); } while (!phaser.isTerminated()); } }.start(); } phaser.arriveAndDeregister(); // deregister self, don't wait }
如果主任务需要等待phaser的结束,那么它可以不断地注册自己从而执行一个类似循环:
phaser.register(); while (!phaser.isTerminated()) phaser.arriveAndAwaitAdvance()
你可以等待特定的不超过Integer.MAX_VALUE的阶段数。例如:
void awaitPhase(Phaser phaser, int phase) { int p = phaser.register(); // assumes caller not already registered while (p < phase) { if (phaser.isTerminated()) // ... deal with unexpected termination else p = phaser.arriveAndAwaitAdvance(); } phaser.arriveAndDeregister(); }
创建一个集合的任务可以使用一个树结构的phasers,你能够使用以下的代码,假如一个Task类有一个构造函数接受一个Phaser从而进行注册。在调用build(new Task
, 0, n, new Phaser())后,这些任务能够被执行,
比如提交给一个线程池:
void build(Task[] tasks, int lo, int hi, Phaser ph) { if (hi - lo > TASKS_PER_PHASER) { for (int i = lo; i < hi; i += TASKS_PER_PHASER) { int j = Math.min(i + TASKS_PER_PHASER, hi); build(tasks, i, j, new Phaser(ph)); } } else { for (int i = lo; i < hi; ++i) tasks[i] = new Task(ph); // assumes new Task(ph) performs ph.register() } }最好的{TASKS_PER_PHASER}依赖于期待的同步速率。很小的数字比如4适合于执行非常小的任务(所以速率高),或者成百上千适合于使用大任务。
注意:我们的这个实现将参与者的上限定为65535.尝试注册更多的参与者会导致IllegalStateException异常。当然,你可以通过使用分层Phaser的方法去满足任意数量的参与者。
算法:
当前类(Phaser)实现了一个X10"clocks“的扩展。
主要的64位状态变量(state),被划分为4个域:
unarrived--没有抵达屏障的参与者个数(bits 0-15)
parties --需要等待的参与者个数 (bits 16-37)
phase --屏障的阶段数 (bits 32-62)
terminated --1为结束 (bits 63/sign)
为了能够高效地维护状态的原子性,这些值被打包放进单个64位原子变量。通过简单的方式编码/解码状态变量以及竞争过程短暂,来获得高性能。
所有状态都是通过CAS操作,除了一开始的对于子phaser的注册(拥有一个非null的父phaser),在这种少见的场景中,我们在第一次注册时使用内置同步方式来加锁。
子phaser的阶段数允许滞后于它的祖先知道它能够被使用--查看方法reconcileState。
实现:
public Phaser(Phaser parent, int parties) { if (parties >>> PARTIES_SHIFT != 0) throw new IllegalArgumentException("Illegal number of parties"); int phase = 0; this.parent = parent; if (parent != null) { final Phaser root = parent.root; this.root = root; this.evenQ = root.evenQ; this.oddQ = root.oddQ; if (parties != 0) phase = parent.doRegister(1); } else { this.root = this; this.evenQ = new AtomicReference<QNode>(); this.oddQ = new AtomicReference<QNode>(); } this.state = (parties == 0) ? (long)EMPTY : ((long)phase << PHASE_SHIFT) | ((long)parties << PARTIES_SHIFT) | ((long)parties); }这个构造函数的作用为,根据父Phaser和参与方个数创建一个Phaser。当给出的父phaser不为null以及给出的参与方个数大于0时,这个子phaser会注册到父phaser中。
详情如下:
首先判断将要注册的个数(parties>>>PARTIES_SHIFT!=0),如果成立那么说明参与方太多,直接抛出异常。
设置当前phaser的父phaser以及阶段数phase为0,假如父phaser不为null,那么取得父phaser的根root,设置当前root以及两个队列evenQ和oddQ,当注册数不为0时,在父phaser中注册当前phaser。
否则只需要设置自身为root以及初始化evenQ和oddQ。
设置状态变量state:0-15位作为unarrived个数(若parties为0则用1),16-31位作为parties的值,32-62位作为phase(阶段数)的值,最高位63作为phaser是否结束的标志(1为结束)。
我们接着再来看register(注册)操作:
public int register() { return doRegister(1); }
private int doRegister(int registrations) { // adjustment to state long adjust = ((long)registrations << PARTIES_SHIFT) | registrations; final Phaser parent = this.parent; int phase; for (;;) { long s = (parent == null) ? state : reconcileState(); int counts = (int)s; int parties = counts >>> PARTIES_SHIFT; int unarrived = counts & UNARRIVED_MASK; if (registrations > MAX_PARTIES - parties) throw new IllegalStateException(badRegister(s)); phase = (int)(s >>> PHASE_SHIFT); if (phase < 0) break; if (counts != EMPTY) { // not 1st registration if (parent == null || reconcileState() == s) { if (unarrived == 0) // wait out advance root.internalAwaitAdvance(phase, null); else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s + adjust)) break; } } else if (parent == null) { // 1st root registration long next = ((long)phase << PHASE_SHIFT) | adjust; if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next)) break; } else { synchronized (this) { // 1st sub registration if (state == s) { // recheck under lock phase = parent.doRegister(1); if (phase < 0) break; // finish registration whenever parent registration // succeeded, even when racing with termination, // since these are part of the same "transaction". while (!UNSAFE.compareAndSwapLong (this, stateOffset, s, ((long)phase << PHASE_SHIFT) | adjust)) { s = state; phase = (int)(root.state >>> PHASE_SHIFT); // assert (int)s == EMPTY; } break; } } } } return phase; }
这个方法的作用是为phaser增加一个新的参与方,假如此时unarrived为0,并且onAdvance方法正在被调用,这个方法会阻塞直到phaser的那部分工作完成。如果这个phaser有一个父phaser,并且这个子phaser没有注册过对象,那么这个子phaser也会在它的父phaser上面注册。如果当前phaser已经结束,那么尝试去注册不会有任何影响并且一个负数会返回。
这个方法返回当前这个注册完成时的阶段数。如果注册过多参与者会抛出IllegalStateException异常。
详情如下:
调用doRegister(1),1为注册个数。
doRegister中,首先获得需要递增的值adjust,获得父phaser,然后进入循环。
首先调整state(状态值)(reconcileState),因为阶段数要以root中的为准,所以reconcileState方法会取得root中阶段数组合当前的paties和unarrived值成为新的state状态。
之后假如phase小于0,既phaser已经结束,返回负数。
假如需要注册个数过大,爬出异常。
假如当前counts不为空(parties或者unarrived不为空),那么当parent不为空或者状态数state稳定(阶段数不变)时,假如发现unarrived为0则等待(被状态数改变之后唤醒然后重试),否则尝试调整当前的state状态值。
假如counts为空并且parent为空,则尝试直接改变当前的状态值state(即为调整parties和unarrived值)。
假如counts为空并且parent不为空,那么说明不仅需要改变当前的state,还需要去父phaser中注册,这里使用内置同步块的方式,当phase小于0时返回,在成功调整state之后跳出返回阶段值。
我们接着再来看arriveAndAwaitAdvance(抵达并等待)操作:
public int arriveAndAwaitAdvance() { // Specialization of doArrive+awaitAdvance eliminating some reads/paths final Phaser root = this.root; for (;;) { long s = (root == this) ? state : reconcileState(); int phase = (int)(s >>> PHASE_SHIFT); if (phase < 0) return phase; int counts = (int)s; int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK); if (unarrived <= 0) throw new IllegalStateException(badArrive(s)); if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s -= ONE_ARRIVAL)) { if (unarrived > 1) return root.internalAwaitAdvance(phase, null); if (root != this) return parent.arriveAndAwaitAdvance(); long n = s & PARTIES_MASK; // base of next state int nextUnarrived = (int)n >>> PARTIES_SHIFT; if (onAdvance(phase, nextUnarrived)) n |= TERMINATION_BIT; else if (nextUnarrived == 0) n |= EMPTY; else n |= nextUnarrived; int nextPhase = (phase + 1) & MAX_PHASE; n |= (long)nextPhase << PHASE_SHIFT; if (!UNSAFE.compareAndSwapLong(this, stateOffset, s, n)) return (int)(state >>> PHASE_SHIFT); // terminated releaseWaiters(phase); return nextPhase; } } }
这个方法在参与者注册之后被调用的,作为是声明自己抵达改phaser并等待其他参与者抵达。等价于awaitAdvance(arrive()),如果你想要使用可中断或者超时的方式来等待,那么你可以使用相似的方法通过另外的形式来调用awaitAdvance来达到目的。如果你还需要再抵达之后撤销,那么可以使用awaitAdvance(arriveAndDeregister())。这个方法不能被没有注册的参与方调用,否则可能抛出IllegalStateException异常。
详情如下:
首先取得跟phaser,进入循环。
可能调整并取得最新的状态数state(可能会更新phase)。
取得阶段数(phase),小于0则返回(结束)。
取得counts(该值为当前parties和unarrived的或操作,若为EMPTY说明当前phaser上无任何参与方)。
取得unarrived值,小于或等于0抛出IllegalStateException异常(说明有未注册的一方调用了此方法)。
试着CAS改变state的值(仅unarrived的部分),失败重试。
成功之后,根据unarrived判断,假如>1说明不是最后一个抵达的参与方,所以调用internalAwaitAdvance方法在root上等待。
否则,说明当前是改phaser的最后一个抵达的参与方,那么假如root!=this(当前phaser不是根),就调用parent(父phaser)的arriveAndAwaitAdvance方法(同名方法)。
否则当前phaser就为根,那么保留注册数n(s & PARTIES_MASK ),根据可扩展的onAdvance方法、以及当前注册数以及递增之后的阶段数(phase)来CAS构造新的状态值(state)。这一步若失败说明有外部的终止调用,直接返回后32位(最高位1)即可。
成功之后返回新的状态值。
/** * Main implementation for methods arrive and arriveAndDeregister. * Manually tuned to speed up and minimize race windows for the * common case of just decrementing unarrived field. * * @param adjust value to subtract from state; * ONE_ARRIVAL for arrive, * ONE_DEREGISTER for arriveAndDeregister */ private int doArrive(int adjust) { final Phaser root = this.root; for (;;) { long s = (root == this) ? state : reconcileState(); int phase = (int)(s >>> PHASE_SHIFT); if (phase < 0) return phase; int counts = (int)s; int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK); if (unarrived <= 0) throw new IllegalStateException(badArrive(s)); if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adjust)) { if (unarrived == 1) { long n = s & PARTIES_MASK; // base of next state int nextUnarrived = (int)n >>> PARTIES_SHIFT; if (root == this) { if (onAdvance(phase, nextUnarrived)) n |= TERMINATION_BIT; else if (nextUnarrived == 0) n |= EMPTY; else n |= nextUnarrived; int nextPhase = (phase + 1) & MAX_PHASE; n |= (long)nextPhase << PHASE_SHIFT; UNSAFE.compareAndSwapLong(this, stateOffset, s, n); releaseWaiters(phase); } else if (nextUnarrived == 0) { // propagate deregistration phase = parent.doArrive(ONE_DEREGISTER); UNSAFE.compareAndSwapLong(this, stateOffset, s, s | EMPTY); } else phase = parent.doArrive(ONE_ARRIVAL); } return phase; } } }
doArrive主要被arrive和arriveAndDeregister来调用,与arriveAndAwaitAdvance的代码非常类似(所以不给出详情),主要的区别在于:
doArrive中可能会有撤销操作,而arriveAndAwaitAdvacne中只是抵达操作,所以当nextUnarrived(PARTIES)的值为0时,需要对父phaser也发起传递的撤销操作。
arriveAndAwaitAdvance中只需要调用父phaser的同名方法,但是doArrive中会根据nextUnarrived的值来区分调用父phaser的同名方法时的参数
存在的问题:
相关文章推荐
- Java Thread&Concurrency(15): 深入理解ScheduledThreadPoolExecutor及其实现原理
- Java Thread&Concurrency(16): 深入理解ArrayBlockingQueue及其实现原理
- Java Thread&Concurrency(12): 深入理解AbstractExecutorService及其实现原理
- Java Thread&Concurrency(4): 深入理解Exchanger实现原理
- Java Thread&Concurrency(9): 深入理解StampedLock及其实现原理
- Java Thread&Concurrency(14): 深入理解条件队列(Condition)及其实现原理
- Java Thread&Concurrency(10): 深入理解ThreadLocal及其实现原理
- Java Thread&Concurrency(2): 深入理解ConcurrentSkipListMap实现原理
- Java Thread&Concurrency(11): 深入理解ThreadPoolExecutor及其实现原理
- Java Thread&Concurrency(13): 深入理解ConcurrentLinkedQueue及其实现原理
- Java Thread&Concurrency(7): 深入理解Callable/Future(FutureTask)接口及其实现
- Java Thread&Concurrency(8): 深入理解CompletionService接口及其实现
- Java Thread&Concurrency(1): 深入理解Fork-Join并发执行框架
- Java Thread&Concurrency(3): 深入理解SynchronousQueue实现原理
- java 中HashMap实现原理深入理解
- 深入理解Java并发之synchronized实现原理
- 深入理解Java中的HashMap的实现原理
- 深入理解Java并发之synchronized实现原理
- 深入理解Java并发之synchronized实现原理
- 【深入理解java集合系列】HashSet实现原理