Java并发Concurrent包的锁(五)——CyclicBarrier源码分析及使用
2018-03-07 17:52
525 查看
CyclicBarrier 是一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier。
CyclicBarrier 支持一个可选的 Runnable 命令,在一组线程中的最后一个线程到达之后(但在释放所有线程之前),该命令只在每个屏障点运行一次。若在继续所有参与线程之前更新共享状态,此屏障操作很有用。
创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在启动 barrier 时执行预定义的操作。
基本过程:
1. 如果当前线程不是最后一个线程,调用该方法,那么就在这里一直等待;
2. 直到最后一个线程调用该方法,执行附加指令后,通知所有其他线程。
运行结果:
CyclicBarrier 支持一个可选的 Runnable 命令,在一组线程中的最后一个线程到达之后(但在释放所有线程之前),该命令只在每个屏障点运行一次。若在继续所有参与线程之前更新共享状态,此屏障操作很有用。
属性
// 每一代的类,默认barrier是不损坏的 private static class Generation { boolean broken = false; } // 锁对象 private final ReentrantLock lock = new ReentrantLock(); // condition private final Condition trip = lock.newCondition(); // 表示拦截线程的数量 private final int parties; // 达到条件时触发的操作 private final Runnable barrierCommand; // 当前代 private Generation generation = new Generation(); // 还未到达屏障的线程数量,为0时说明都到达,可以触发屏障的附加指令 private int count;
构造函数
创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操作,该操作由最后一个进入 barrier 的线程执行。public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }
创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在启动 barrier 时执行预定义的操作。
public CyclicBarrier(int parties) { this(parties, null); }
await
线程调用 await 来进行等待,实际调用 dowait 方法。public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } }
关键方法
dowait
本类中最关键的方法。基本过程:
1. 如果当前线程不是最后一个线程,调用该方法,那么就在这里一直等待;
2. 直到最后一个线程调用该方法,执行附加指令后,通知所有其他线程。
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { // 可重入锁对象 final ReentrantLock lock = this.lock; // 当前线程加锁 lock.lock(); try { // 分代 final Generation g = generation; // 如果分代损坏,抛出屏障损坏异常 if (g.broken) throw new BrokenBarrierException(); // 如果线程中断了,中止barrier,唤醒所有线程并抛出异常 if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } // 正常情况下 // index为count减1后的值 int index = --count; // 如果index为0,说明所有线程都已经到位 if (index == 0) { // 执行附加执行的标记 boolean ranAction = false; try { // 附加指令如果不为null的话,就需要执行附加指令 final Runnable command = barrierCommand; if (command != null) command.run(); // 执行完后置为true ranAction = true; // 本代完成,设置下一代 nextGeneration(); return 0; } finally { // 这里如果执行附加命令出问题了,中止barrier if (!ranAction) breakBarrier(); } } // index不为0的情况,说明当前线程不是要到位的最后一个线程 // 所以一直循环等待,直到收到唤醒信号 for (;;) { try { // timed默认为false,不是超时等待,所以这里执行Condition.await方法 // 当前线程一直等待直到被唤醒 if (!timed) trip.await(); // 否则如果纳秒参数大于0,执行trip.awaitNanos方法 else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }
breakBarrier
// 中止barrier private void breakBarrier() { // 将broken置为true generation.broken = true; // 重置count值为parties count = parties; // 唤醒所有的等待线程 trip.signalAll(); }
nextGeneration
private void nextGeneration() { // 上一代完成,唤醒所有线程 trip.signalAll(); // 重置count,设置新的一代 count = parties; generation = new Generation(); }
reset
将屏障重置为其初始状态。public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { // 中止当前barrier breakBarrier(); // 产生新的一代 nextGeneration(); } finally { lock.unlock(); } }
使用情景
比如统计文件行数,多个线程,最终都读完加一个总和。public class TestBarrier { private static CyclicBarrier barrier; public static class CountThread extends Thread { @Override public void run() { // 业务逻辑 System.out.println("线程" + Thread.currentThread().getName() + "读完了!"); // 等待其它线程 try { barrier.await(); } catch (Exception e) { e.printStackTrace(); } } } public static void main(String[] args) { barrier = new CyclicBarrier(10, new Runnable() { @Override public void run() { System.out.println("所有线程都读完了,可以合计总数了"); // 业务逻辑 } }); for(int i=0; i<10; i++){ new CountThread().start(); } } }
运行结果:
线程Thread-2读完了! 线程Thread-0读完了! 线程Thread-1读完了! 线程Thread-3读完了! 线程Thread-4读完了! 线程Thread-6读完了! 线程Thread-5读完了! 线程Thread-7读完了! 线程Thread-8读完了! 线程Thread-9读完了! 所有线程都读完了,可以合计总数了
相关文章推荐
- Java并发Concurrent包的锁(七)——Semaphore源码分析及使用
- Java并发Concurrent包的锁(六)——CountDownLatch源码分析及使用
- Java并发系列之CyclicBarrier源码分析
- Java中的并发工具CountDownLatch、CyclicBarrier、Semapphore使用详解
- Java并发实例之CyclicBarrier的使用
- Java并发编程 -- Concurrent包源码分析4 -- 各种锁与无锁
- Java并发Concurrent包的锁(四)——读写锁源码分析
- Java并发编程 -- Concurrent包源码分析4 -- 各种锁与无锁
- 【java并发】线程同步工具CyclicBarrier的使用
- 并发编程之 CyclicBarrier 源码分析
- [Java并发]使用CountDownLatch和CyclicBarrier等待多线程完成
- 使用java并发工具栅栏(CyclicBarrier)实现多线程等待,同一时刻执行共同任务
- Java并发包中CyclicBarrier的工作原理、使用示例
- Java并发包中CyclicBarrier的工作原理、使用示例
- java并发:CyclicBarrier的使用
- java.util.concurrent 下的Semaphore CyclicBarrier CountDownLatch 分析使用
- 使用Java辅助类(CountDownLatch、CyclicBarrier、Semaphore)并发编程
- java并发编程之——CountDownLatch和CyclicBarrier的使用
- Java - "JUC" CyclicBarrier源码分析
- java并发工具CyclicBarrier的理解和使用