您的位置:首页 > 编程语言 > Java开发

实战Java高并发程序设计之CyclicBarrier

2017-06-19 22:19 302 查看
CyclicBarrier是另一种多线程并发控制使用工具.和CountDownLatch非常的相似,它也可以实现线程间的计数等待,但它的功能比CountDownLatch更加复杂且强大.

CyclicBarrier可以理解成循环栅栏.栅栏就是一种阻碍物.这里是用来阻止线程继续执行,要求线程在栅栏处等待.

前面Cyclic意为循环,也就是说这个计数器可以反复使用.

CyclicBarrier和CountDownLatch的区别

两个看上去有点像的类,都在java.util.concurrent下,都可以用来表示代码运行到某个点上,二者的区别在于:

(1)CyclicBarrier的某个线程运行到某个点上之后,该线程即停止运行,直到所有的线程都到达了这个点,所有线程才重新运行;

    CountDownLatch则不是,某线程运行到某个点上之后,只是给某个数值-1而已,该线程继续运行

(2)CyclicBarrier只能唤起一个任务,CountDownLatch可以唤起多个任务

(3)CyclicBarrier可重用,CountDownLatch不可重用,计数值为0该CountDownLatch就不可再用了

(4)CyclicBarrier是又重入锁实现,CountDownLatch是又AQS直接实现.

CyclicBarrier 源码实现:

public class CyclicBarrier {
/**
* 每次对栅栏的使用可以表示为一个generation实例。
* 栅栏每次开放或者重置, generation都会发生改变。
* 使用栅栏的线程可以关联多个generations, 由于等待线程可能会以多种方式请求锁,
* 但是在特定的时间只有一个是可用的,其他的要么被打破,要么开放。
* 如果一个栅栏已经被打破。且没有后续的重置动作,那么可以不存在可用的generation。
*/
private static class Generation {
boolean broken = false;
}

/** 使用重入锁保护栅栏 */
private final ReentrantLock lock = new ReentrantLock();
/** 栅栏开放的条件 */
private final Condition trip = lock.newCondition();
/** 表示当前使用栅栏的使用方(线程)数量 */
private final int parties;
/* 当栅栏开放以后要执行的动作 */
private final Runnable barrierCommand;
/** 当前generation */
private Generation generation = new Generation();

/**
* 处于等待状态的使用方(线程)的数量,在每一个generation上从
* parties递减为0。当新建generation(栅栏开放)或者栅栏被打破
* 时,重置为parties。
*/
private int count;
使用了重入锁实现,使用Condition来实现开放条件.所有的请求都要在该条件上等待,直到被打破

当当前generation的参与者全都到达栅栏以后,栅栏会唤醒所有在栅栏处等待的线程,栅栏开放.会重置CyclicBarrier

/**
* 更新障碍上的状态,并且唤醒所有在栅栏处等待的线程
* 只有持有该锁时候才会被调用
*/
private void nextGeneration() {
//唤醒所有在栅栏处等待的线程
trip.signalAll();
// 设置下一个generation
count = parties;
generation = new Generation();
}


当发生异常或者中断的时候,设置当前栅栏为打破:

/**
* 设置当前栅栏generation状态为打破状态,并唤醒栅栏处的等待线程。
* 这个方法只有在持有锁的情况下被调用。
*/
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}


主要代码:

/**
* 栅栏主体代码,涵盖了所有情况。
*/
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//获取当前generation实例
final Generation g = generation;
//如果为broken,则抛出BrokenBarrierException异常
if (g.broken)
throw new BrokenBarrierException();
//如果线程中断,调用上面的breakBarrier()方法,并抛出中断异常
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
//计算到达线程的下标
int index = --count;
if (index == 0) {  // 如果为0,栅栏开放
//任务标志.如果有任务执行命令,设置为ture
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
//如果有命令,则执行命令
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
//没命令时,直接结束当前generation
if (!ranAction)
breakBarrier();
}
}

// 等待中的主循环,直到栅栏开放、栅栏被打破、线程被打断或者超时时退出。
for (;;) {
try {
//时间没到级继续等待
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
//如果是当前generation 且generation状态为未打破,那么打破栅栏。
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
//即使我们没有中断,我们即将完成等待,所以这个中断被视为“属于”下一次执行的中断
Thread.currentThread().interrupt();
}
}

if (g.broken)
throw new BrokenBarrierException();
//如果generation改变了,说明之前的栅栏已经开放,返回index。
if (g != generation)
return index;
//如果超时,打破栅栏,并返回超时异常。
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
//释放锁
lock.unlock();
}
}


等待所有参与者在此栅栏上调用await方法
 如果当前线程不是最后一个线程,那么它将被禁止进行线程调度,并处于休眠状态,直到发生下列事情之一:

     * 1.最后一个线程到达; 

     * 2.当前线程以外的一些其他线程中断;

     * 3.一些其他线程中断

     * 4.一些其他线程在等待栅栏时超时; 

     * 5.其他线程在此栅栏上调用reset()方法.

/**
* 如果当前线程是到达的最后一个线程,并且在构造函数中提供非空栅栏操作,则当前线程在允许其他线程继续之前运行该动作.
* 如果在栅栏动作期间发生异常,则该异常将在当前线程中传播,并且屏障置于断开状态.
*
*/
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}

/**
* 比上面多了超时
*/
public int awa
b32e
it(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}


其他方法:

/**
* 构造函数
* @param   栅栏开放之前,参与者的线程必须调用await方法等待
* @param 	当栅栏开放以后,执行的指令。如果为空,表示没有动作
* @throws 	当参与者少于1时,报IllegalArgumentException
*/
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}

/**
* 不执行动作
*/
public CyclicBarrier(int parties) {
this(parties, null);
}

/**
* 返回栅栏开放所需要的参与者数量.
*/
public int getParties() {
return parties;
}

/**
* 查询这个栅栏是否处于broken状态.
*/
public boolean isBroken() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return generation.broken;
} finally {
lock.unlock();
}
}

/**
* 将栅栏重置为初始状态.
*/
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//重置逻辑,先打破当前的栅栏
breakBarrier();
//然后建立一个新的。
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}

/**
* 获取在栅栏处等待的使用方(线程)数量。
* 主要用于测试
*/
public int getNumberWaiting() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return parties - count;
} finally {
lock.unlock();
}
}


书中demo:

public class CyclicBarrierDemo {
public static class Soldier implements Runnable {
private String soldier;
private final CyclicBarrier cyclic;
Soldier(CyclicBarrier cyclic, String soldierName) {
this.cyclic = cyclic;
this.soldier = soldierName;
}
public void run() {
try {
//等待所有士兵到齐
cyclic.await();
doWork();
//等待所有士兵完成任务
cyclic.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
void doWork() {
try {
Thread.sleep(Math.abs(new Random().nextInt()%10000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(soldier + ":任务完成");
}
}

public static class BarrierRun implements Runnable {
boolean flag;
int N;
public BarrierRun(boolean flag, int N) {
this.flag = flag;
this.N = N;
}
public void run() {
if (flag) {
System.out.println("司令:[士兵" + N + "个,任务完成!]");
} else {
System.out.println("司令:[士兵"  + N + "集合完毕]");
flag = true;
}
}
}

public static void main(String args[]) throws InterruptedException {
final int N = 10;
Thread[] allSoldier=new Thread
;
boolean flag = false;
CyclicBarrier cyclic = new CyclicBarrier(N, new BarrierRun(flag, N));
//设置屏障点,主要是为了执行这个方法
System.out.println("队伍集合");
for (int i = 0; i < N; ++i) {
System.out.println("士兵 "+i+" 报道");
allSoldier[i]=new Thread(new Soldier(cyclic, "士兵 " + i));
allSoldier[i].start();
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: