Java并发学习(十六)-并发工具CyclicBarrier
2017-12-09 23:29
736 查看
What is CyclicBarrier
从名字来看,barrier:栅栏,cyclic:循环的。简单点说,它能够让一组数量线程在某一个时间点等待,当所有线程都到了,再让它们运行。
例如有这样一个情景,有n个excel表格数据,你需要获取他们所有的数据后,再进行下一步运算。利用CyclicBarrier来做,首先需要开n个线程,等这n个线程都完成到达这个点,再进行下一步运算。
首先看它的内部构造:
CyclicBarrier结构
记得开始讲ReentrantLock的时候,说道Condition这个类,它和普通的wait和notify有区别,因为它能够唤醒一组线程,这里可以参看: Java并发学习(十二)-ReentrantLock分析 。所以这里思路就简单了,没错,利用ReentranLock和Condition来实现。
public class CyclicBarrier { /** * 每一个barrier,都代表这一个Generation实例。当barrier损坏或者重置是,generation会改变。 */ private static class Generation { boolean broken = false; } // 利用可重入锁,reentrantlock,守护栅栏的进入 private final ReentrantLock lock = new ReentrantLock(); //等待,知道被损坏。 private final Condition trip = lock.newCondition(); //代表进入数量 private final int parties; //当所有线程到达时候,需要做的事。 private final Runnable barrierCommand; .. }
简单列举出几点:
从cyclic知道,一个Cyclic是可以循环使用的,就好比去银行换整钞,到100你就能换一张1百的,可以循环利用。这个是通过
Generation这个静态内部类来实现的。
ReentrantLock和
Condition来控制挂起以及唤醒操作。
可以自定义
barrierCommand,即当所有线程都到达时,先执行
barrierCommand线程,在唤醒所有等待线程。具体分到时间片则有区别。
构造方法
CyclicBarrier(int parties):当给定数量的参与者(线程)等待它时,它将跳闸,自动唤醒所有线程并运行。CyclicBarrier(int parties, Runnable barrierAction):当给定数量的参与者(线程)等待它时将会跳闸,并首先执行barrierAction线程。然后唤醒其他线程,这些由最后进来的线程执行。
接下来分析等待代码。
await方法
当需要利用CyclicBarrier进行等待时候,调用await方法:public int await() throws InterruptedException, BrokenBarrierException { try { //强行等待。不带超时 return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } }
再看dowait方法:
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; //加锁 lock.lock(); try { //获取generation final Generation g = generation; //如果g,被破坏了,也就是为true了,那么就报错。 if (g.broken) throw new BrokenBarrierException(); //如果线程中断了。当前线程,那么也报错。 if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } //先自减。然后看是否需要运行command //也就是不用把最后一个也await了,直接让他运行。 int index = --count; if (index == 0) { // tripped 绊倒。 //都到了,开始运行。 boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; //更新generation,并唤醒所有线程。 nextGeneration(); return 0; } finally { if (!ranAction) //打破栅栏,让所有线程都运行。 breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out //自旋,直到到齐运行,generation被破坏,中断,或者超时。 for (;;) { try { //等待,有超时的等待,以及普通无限等待等。 if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { //报错或者中断 if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { Thread.currentThread().interrupt(); } } //判断其他情况 if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { //释放锁。 lock.unlock(); } }
代码虽长,但是理解还是比较简单的,重要注释写在代码里面了,这里讲讲具体思路:
1. 首先等待操作加锁操作。
2. 判断generation是否改变,是否中断等。
3. 将count自减操作,如果已经满足开启栅栏要求,则不对最后一个线程进行await操作,直接signalAll其他线程。
4. 检查完后,则进行自旋操作,直到到齐运行,generation被破坏,中断,或者超时。
5. 最后释放锁。
例子
public class CyclicBarrierTest { private static CyclicBarrier cyclicBarrier; static class CyclicBarrierThread extends Thread { public void run() { System.out.println(Thread.currentThread().getName() + "我走完了run了"); // 等待 try { cyclicBarrier.await(); } catch (Exception e) { e.printStackTrace(); } } } public static void main(String[] args) { cyclicBarrier = new CyclicBarrier(3, new Runnable() { public void run() { System.out.println("大家都走完了"); } }); for (int i = 0; i < 3; i++) { new CyclicBarrierThread().start(); } } }
输出结果如下:
参考资料:
1. https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CyclicBarrier.html
相关文章推荐
- Java并发学习之十八——线程同步工具之CyclicBarrier
- 《Java 7 并发编程指南》学习概要 (3)Semaphore, CountDownLatch, CyclicBarrier , Phaser, Exchanger
- java并发编程学习:如何等待多个线程执行完成后再继续后续处理(synchronized、join、FutureTask、CyclicBarrier)
- JAVA中的并发工具类(二)---同步屏障类CyclicBarrier
- java并发工具CyclicBarrier的理解和使用
- Java并发学习之十六——线程同步工具之信号量(Semaphores)
- Java中的并发工具CountDownLatch、CyclicBarrier、Semapphore使用详解
- java并发编程学习:如何等待多个线程执行完成后再继续后续处理(synchronized、join、FutureTask、CyclicBarrier)
- Java并发编程深入学习——CountDownLatch、CyclicBarrier和Semaphore
- Java并发学习笔记(四)-栅栏CyclicBarrier
- [置顶] java 并发编程实战书籍学习 第五章,CountDownLatch,FutureTask,CyclicBarrier,Semaphore学习
- [Java并发包学习五]CountDownLatch和CyclicBarrier介绍
- 【java并发】线程同步工具CyclicBarrier的使用
- Java并发学习笔记(15)信号量(Semaphore) 关卡((2)CyclicBarrier)
- 使用java并发工具栅栏(CyclicBarrier)实现多线程等待,同一时刻执行共同任务
- Java并发之CyclicBarrier的学习
- Java 多线程协调工具 CyclicBarrier 与CountDownLatch 学习
- [Java并发包学习五]CountDownLatch和CyclicBarrier介绍
- Java并发编程之栅栏(CyclicBarrier)详解
- java 中间件学习4-CountDownLatch、CyclicBarrier、Future和FutureTask