实战Java高并发程序设计之CyclicBarrier
2017-06-19 22:19
302 查看
CyclicBarrier是另一种多线程并发控制使用工具.和CountDownLatch非常的相似,它也可以实现线程间的计数等待,但它的功能比CountDownLatch更加复杂且强大.
CyclicBarrier可以理解成循环栅栏.栅栏就是一种阻碍物.这里是用来阻止线程继续执行,要求线程在栅栏处等待.
前面Cyclic意为循环,也就是说这个计数器可以反复使用.
CyclicBarrier和CountDownLatch的区别
两个看上去有点像的类,都在java.util.concurrent下,都可以用来表示代码运行到某个点上,二者的区别在于:
(1)CyclicBarrier的某个线程运行到某个点上之后,该线程即停止运行,直到所有的线程都到达了这个点,所有线程才重新运行;
CountDownLatch则不是,某线程运行到某个点上之后,只是给某个数值-1而已,该线程继续运行
(2)CyclicBarrier只能唤起一个任务,CountDownLatch可以唤起多个任务
(3)CyclicBarrier可重用,CountDownLatch不可重用,计数值为0该CountDownLatch就不可再用了
(4)CyclicBarrier是又重入锁实现,CountDownLatch是又AQS直接实现.
CyclicBarrier 源码实现:
当当前generation的参与者全都到达栅栏以后,栅栏会唤醒所有在栅栏处等待的线程,栅栏开放.会重置CyclicBarrier
当发生异常或者中断的时候,设置当前栅栏为打破:
主要代码:
等待所有参与者在此栅栏上调用await方法
如果当前线程不是最后一个线程,那么它将被禁止进行线程调度,并处于休眠状态,直到发生下列事情之一:
* 1.最后一个线程到达;
* 2.当前线程以外的一些其他线程中断;
* 3.一些其他线程中断
* 4.一些其他线程在等待栅栏时超时;
* 5.其他线程在此栅栏上调用reset()方法.
其他方法:
书中demo:
CyclicBarrier可以理解成循环栅栏.栅栏就是一种阻碍物.这里是用来阻止线程继续执行,要求线程在栅栏处等待.
前面Cyclic意为循环,也就是说这个计数器可以反复使用.
CyclicBarrier和CountDownLatch的区别
两个看上去有点像的类,都在java.util.concurrent下,都可以用来表示代码运行到某个点上,二者的区别在于:
(1)CyclicBarrier的某个线程运行到某个点上之后,该线程即停止运行,直到所有的线程都到达了这个点,所有线程才重新运行;
CountDownLatch则不是,某线程运行到某个点上之后,只是给某个数值-1而已,该线程继续运行
(2)CyclicBarrier只能唤起一个任务,CountDownLatch可以唤起多个任务
(3)CyclicBarrier可重用,CountDownLatch不可重用,计数值为0该CountDownLatch就不可再用了
(4)CyclicBarrier是又重入锁实现,CountDownLatch是又AQS直接实现.
CyclicBarrier 源码实现:
public class CyclicBarrier { /** * 每次对栅栏的使用可以表示为一个generation实例。 * 栅栏每次开放或者重置, generation都会发生改变。 * 使用栅栏的线程可以关联多个generations, 由于等待线程可能会以多种方式请求锁, * 但是在特定的时间只有一个是可用的,其他的要么被打破,要么开放。 * 如果一个栅栏已经被打破。且没有后续的重置动作,那么可以不存在可用的generation。 */ private static class Generation { boolean broken = false; } /** 使用重入锁保护栅栏 */ private final ReentrantLock lock = new ReentrantLock(); /** 栅栏开放的条件 */ private final Condition trip = lock.newCondition(); /** 表示当前使用栅栏的使用方(线程)数量 */ private final int parties; /* 当栅栏开放以后要执行的动作 */ private final Runnable barrierCommand; /** 当前generation */ private Generation generation = new Generation(); /** * 处于等待状态的使用方(线程)的数量,在每一个generation上从 * parties递减为0。当新建generation(栅栏开放)或者栅栏被打破 * 时,重置为parties。 */ private int count;使用了重入锁实现,使用Condition来实现开放条件.所有的请求都要在该条件上等待,直到被打破
当当前generation的参与者全都到达栅栏以后,栅栏会唤醒所有在栅栏处等待的线程,栅栏开放.会重置CyclicBarrier
/** * 更新障碍上的状态,并且唤醒所有在栅栏处等待的线程 * 只有持有该锁时候才会被调用 */ private void nextGeneration() { //唤醒所有在栅栏处等待的线程 trip.signalAll(); // 设置下一个generation count = parties; generation = new Generation(); }
当发生异常或者中断的时候,设置当前栅栏为打破:
/** * 设置当前栅栏generation状态为打破状态,并唤醒栅栏处的等待线程。 * 这个方法只有在持有锁的情况下被调用。 */ private void breakBarrier() { generation.broken = true; count = parties; trip.signalAll(); }
主要代码:
/** * 栅栏主体代码,涵盖了所有情况。 */ private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { //获取当前generation实例 final Generation g = generation; //如果为broken,则抛出BrokenBarrierException异常 if (g.broken) throw new BrokenBarrierException(); //如果线程中断,调用上面的breakBarrier()方法,并抛出中断异常 if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } //计算到达线程的下标 int index = --count; if (index == 0) { // 如果为0,栅栏开放 //任务标志.如果有任务执行命令,设置为ture boolean ranAction = false; try { final Runnable command = barrierCommand; //如果有命令,则执行命令 if (command != null) command.run(); ranAction = true; nextGeneration(); return 0; } finally { //没命令时,直接结束当前generation if (!ranAction) breakBarrier(); } } // 等待中的主循环,直到栅栏开放、栅栏被打破、线程被打断或者超时时退出。 for (;;) { try { //时间没到级继续等待 if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { //如果是当前generation 且generation状态为未打破,那么打破栅栏。 if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { //即使我们没有中断,我们即将完成等待,所以这个中断被视为“属于”下一次执行的中断 Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); //如果generation改变了,说明之前的栅栏已经开放,返回index。 if (g != generation) return index; //如果超时,打破栅栏,并返回超时异常。 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { //释放锁 lock.unlock(); } }
等待所有参与者在此栅栏上调用await方法
如果当前线程不是最后一个线程,那么它将被禁止进行线程调度,并处于休眠状态,直到发生下列事情之一:
* 1.最后一个线程到达;
* 2.当前线程以外的一些其他线程中断;
* 3.一些其他线程中断
* 4.一些其他线程在等待栅栏时超时;
* 5.其他线程在此栅栏上调用reset()方法.
/** * 如果当前线程是到达的最后一个线程,并且在构造函数中提供非空栅栏操作,则当前线程在允许其他线程继续之前运行该动作. * 如果在栅栏动作期间发生异常,则该异常将在当前线程中传播,并且屏障置于断开状态. * */ public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } } /** * 比上面多了超时 */ public int awa b32e it(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true, unit.toNanos(timeout)); }
其他方法:
/** * 构造函数 * @param 栅栏开放之前,参与者的线程必须调用await方法等待 * @param 当栅栏开放以后,执行的指令。如果为空,表示没有动作 * @throws 当参与者少于1时,报IllegalArgumentException */ public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } /** * 不执行动作 */ public CyclicBarrier(int parties) { this(parties, null); } /** * 返回栅栏开放所需要的参与者数量. */ public int getParties() { return parties; } /** * 查询这个栅栏是否处于broken状态. */ public boolean isBroken() { final ReentrantLock lock = this.lock; lock.lock(); try { return generation.broken; } finally { lock.unlock(); } } /** * 将栅栏重置为初始状态. */ public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { //重置逻辑,先打破当前的栅栏 breakBarrier(); //然后建立一个新的。 nextGeneration(); // start a new generation } finally { lock.unlock(); } } /** * 获取在栅栏处等待的使用方(线程)数量。 * 主要用于测试 */ public int getNumberWaiting() { final ReentrantLock lock = this.lock; lock.lock(); try { return parties - count; } finally { lock.unlock(); } }
书中demo:
public class CyclicBarrierDemo { public static class Soldier implements Runnable { private String soldier; private final CyclicBarrier cyclic; Soldier(CyclicBarrier cyclic, String soldierName) { this.cyclic = cyclic; this.soldier = soldierName; } public void run() { try { //等待所有士兵到齐 cyclic.await(); doWork(); //等待所有士兵完成任务 cyclic.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } void doWork() { try { Thread.sleep(Math.abs(new Random().nextInt()%10000)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(soldier + ":任务完成"); } } public static class BarrierRun implements Runnable { boolean flag; int N; public BarrierRun(boolean flag, int N) { this.flag = flag; this.N = N; } public void run() { if (flag) { System.out.println("司令:[士兵" + N + "个,任务完成!]"); } else { System.out.println("司令:[士兵" + N + "集合完毕]"); flag = true; } } } public static void main(String args[]) throws InterruptedException { final int N = 10; Thread[] allSoldier=new Thread ; boolean flag = false; CyclicBarrier cyclic = new CyclicBarrier(N, new BarrierRun(flag, N)); //设置屏障点,主要是为了执行这个方法 System.out.println("队伍集合"); for (int i = 0; i < N; ++i) { System.out.println("士兵 "+i+" 报道"); allSoldier[i]=new Thread(new Soldier(cyclic, "士兵 " + i)); allSoldier[i].start(); } } }
相关文章推荐
- 实战Java高并发程序设计之ReentrantLock(二)
- 实战Java高并发程序设计之Condition
- 【备忘】Java高并发程序设计实战视频教程
- 实战Java高并发程序设计之概念
- 【Java并发编程实战】-----“J.U.C”:CyclicBarrier
- 实战Java高并发程序设计之Java内存模型和线程安全
- 实战Java高并发程序设计之ReentrantLock(一)
- 【实战Java高并发程序设计】5:让普通变量也享受原子操作
- 实战Java高并发程序设计之多线程基础
- 实战Java高并发程序设计-04 Java并发包锁和其他工具的使用
- 实战Java高并发程序设计-02几个重要概念
- 【Java并发编程实战】—–“J.U.C”:CyclicBarrier
- 实战Java高并发程序设计-01历史概述
- 【Java并发编程实战】-----“J.U.C”:CyclicBarrier
- 【实战Java高并发程序设计 5】让普通变量也享受原子操作
- 实战Java高并发程序设计-03 Java线程
- 实战Java高并发程序设计-06 Java并发包并发集合
- 实战Java高并发程序设计-07原子操作
- Java 并发专题 : CyclicBarrier 打造一个安全的门禁系统
- java高并发程序设计总结四:JDK并发包之信号量Semaphore