您的位置:首页 > 编程语言 > Java开发

Java并发学习(十六)-并发工具CyclicBarrier

2017-12-09 23:29 736 查看

What is CyclicBarrier

从名字来看,barrier:栅栏,cyclic:循环的。

简单点说,它能够让一组数量线程在某一个时间点等待,当所有线程都到了,再让它们运行。

例如有这样一个情景,有n个excel表格数据,你需要获取他们所有的数据后,再进行下一步运算。利用CyclicBarrier来做,首先需要开n个线程,等这n个线程都完成到达这个点,再进行下一步运算。

首先看它的内部构造:

CyclicBarrier结构

记得开始讲ReentrantLock的时候,说道Condition这个类,它和普通的wait和notify有区别,因为它能够唤醒一组线程,这里可以参看: Java并发学习(十二)-ReentrantLock分析

所以这里思路就简单了,没错,利用ReentranLock和Condition来实现。

public class CyclicBarrier {
/**
* 每一个barrier,都代表这一个Generation实例。当barrier损坏或者重置是,generation会改变。
*/
private static class Generation {
boolean broken = false;
}

// 利用可重入锁,reentrantlock,守护栅栏的进入
private final ReentrantLock lock = new ReentrantLock();
//等待,知道被损坏。
private final Condition trip = lock.newCondition();
//代表进入数量
private final int parties;
//当所有线程到达时候,需要做的事。
private final Runnable barrierCommand;
..
}


简单列举出几点:

从cyclic知道,一个Cyclic是可以循环使用的,就好比去银行换整钞,到100你就能换一张1百的,可以循环利用。这个是通过
Generation
这个静态内部类来实现的。

ReentrantLock
Condition
来控制挂起以及唤醒操作。

可以自定义
barrierCommand
,即当所有线程都到达时,先执行
barrierCommand
线程,在唤醒所有等待线程。具体分到时间片则有区别。

构造方法

CyclicBarrier(int parties):当给定数量的参与者(线程)等待它时,它将跳闸,自动唤醒所有线程并运行。

CyclicBarrier(int parties, Runnable barrierAction):当给定数量的参与者(线程)等待它时将会跳闸,并首先执行barrierAction线程。然后唤醒其他线程,这些由最后进来的线程执行。

接下来分析等待代码。

await方法

当需要利用CyclicBarrier进行等待时候,调用await方法:

public int await() throws InterruptedException, BrokenBarrierException {
try {
//强行等待。不带超时
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}


再看dowait方法:

private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
//加锁
lock.lock();
try {
//获取generation
final Generation g = generation;
//如果g,被破坏了,也就是为true了,那么就报错。
if (g.broken)
throw new BrokenBarrierException();

//如果线程中断了。当前线程,那么也报错。
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}

//先自减。然后看是否需要运行command
//也就是不用把最后一个也await了,直接让他运行。
int index = --count;
if (index == 0) {  // tripped  绊倒。
//都到了,开始运行。
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
//更新generation,并唤醒所有线程。
nextGeneration();
return 0;
} finally {
if (!ranAction)
//打破栅栏,让所有线程都运行。
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
//自旋,直到到齐运行,generation被破坏,中断,或者超时。
for (;;) {
try {
//等待,有超时的等待,以及普通无限等待等。
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
//报错或者中断
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
Thread.currentThread().interrupt();
}
}
//判断其他情况
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
//释放锁。
lock.unlock();
}
}


代码虽长,但是理解还是比较简单的,重要注释写在代码里面了,这里讲讲具体思路:

1. 首先等待操作加锁操作。

2. 判断generation是否改变,是否中断等。

3. 将count自减操作,如果已经满足开启栅栏要求,则不对最后一个线程进行await操作,直接signalAll其他线程。

4. 检查完后,则进行自旋操作,直到到齐运行,generation被破坏,中断,或者超时。

5. 最后释放锁。

例子

public class CyclicBarrierTest {
private static CyclicBarrier cyclicBarrier;

static class CyclicBarrierThread extends Thread {
public void run() {
System.out.println(Thread.currentThread().getName() + "我走完了run了");
// 等待
try {
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
cyclicBarrier = new CyclicBarrier(3, new Runnable() {
public void run() {
System.out.println("大家都走完了");
}
});
for (int i = 0; i < 3; i++) {
new CyclicBarrierThread().start();
}
}
}


输出结果如下:



参考资料:

1. https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CyclicBarrier.html
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐