您的位置:首页 > 编程语言 > Java开发

Java并发Concurrent包的锁(五)——CyclicBarrier源码分析及使用

2018-03-07 17:52 525 查看
CyclicBarrier 是一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier。

CyclicBarrier 支持一个可选的 Runnable 命令,在一组线程中的最后一个线程到达之后(但在释放所有线程之前),该命令只在每个屏障点运行一次。若在继续所有参与线程之前更新共享状态,此屏障操作很有用。

属性

// 每一代的类,默认barrier是不损坏的
private static class Generation {
boolean broken = false;
}

// 锁对象
private final ReentrantLock lock = new ReentrantLock();
// condition
private final Condition trip = lock.newCondition();
// 表示拦截线程的数量
private final int parties;
// 达到条件时触发的操作
private final Runnable barrierCommand;
// 当前代
private Generation generation = new Generation();

// 还未到达屏障的线程数量,为0时说明都到达,可以触发屏障的附加指令
private int count;


构造函数

创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操作,该操作由最后一个进入 barrier 的线程执行。

public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}


创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在启动 barrier 时执行预定义的操作。

public CyclicBarrier(int parties) {
this(parties, null);
}


await

线程调用 await 来进行等待,实际调用 dowait 方法。

public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}


关键方法

dowait

本类中最关键的方法。

基本过程:

1. 如果当前线程不是最后一个线程,调用该方法,那么就在这里一直等待;

2. 直到最后一个线程调用该方法,执行附加指令后,通知所有其他线程。

private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
// 可重入锁对象
final ReentrantLock lock = this.lock;
// 当前线程加锁
lock.lock();
try {
// 分代
final Generation g = generation;
// 如果分代损坏,抛出屏障损坏异常
if (g.broken)
throw new BrokenBarrierException();
// 如果线程中断了,中止barrier,唤醒所有线程并抛出异常
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// 正常情况下
// index为count减1后的值
int index = --count;
// 如果index为0,说明所有线程都已经到位
if (index == 0) {
// 执行附加执行的标记
boolean ranAction = false;
try {
// 附加指令如果不为null的话,就需要执行附加指令
final Runnable command = barrierCommand;
if (command != null)
command.run();
// 执行完后置为true
ranAction = true;
// 本代完成,设置下一代
nextGeneration();
return 0;
} finally {
// 这里如果执行附加命令出问题了,中止barrier
if (!ranAction)
breakBarrier();
}
}

// index不为0的情况,说明当前线程不是要到位的最后一个线程
// 所以一直循环等待,直到收到唤醒信号
for (;;) {
try {
// timed默认为false,不是超时等待,所以这里执行Condition.await方法
// 当前线程一直等待直到被唤醒
if (!timed)
trip.await();
// 否则如果纳秒参数大于0,执行trip.awaitNanos方法
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}

if (g.broken)
throw new BrokenBarrierException();

if (g != generation)
return index;

if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}


breakBarrier

// 中止barrier
private void breakBarrier() {
// 将broken置为true
generation.broken = true;
// 重置count值为parties
count = parties;
// 唤醒所有的等待线程
trip.signalAll();
}


nextGeneration

private void nextGeneration() {
// 上一代完成,唤醒所有线程
trip.signalAll();
// 重置count,设置新的一代
count = parties;
generation = new Generation();
}


reset

将屏障重置为其初始状态。

public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 中止当前barrier
breakBarrier();
// 产生新的一代
nextGeneration();
} finally {
lock.unlock();
}
}


使用情景

比如统计文件行数,多个线程,最终都读完加一个总和。

public class TestBarrier {

private static CyclicBarrier barrier;

public static class CountThread extends Thread {

@Override
public void run() {
// 业务逻辑
System.out.println("线程" + Thread.currentThread().getName() + "读完了!");
// 等待其它线程
try {
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}

}

public static void main(String[] args) {

barrier = new CyclicBarrier(10, new Runnable() {

@Override
public void run() {
System.out.println("所有线程都读完了,可以合计总数了");
// 业务逻辑
}
});

for(int i=0; i<10; i++){
new CountThread().start();
}

}

}


运行结果:

线程Thread-2读完了!
线程Thread-0读完了!
线程Thread-1读完了!
线程Thread-3读完了!
线程Thread-4读完了!
线程Thread-6读完了!
线程Thread-5读完了!
线程Thread-7读完了!
线程Thread-8读完了!
线程Thread-9读完了!
所有线程都读完了,可以合计总数了
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: