您的位置:首页 > 编程语言 > Java开发

Java并发之工具类CyclicBarrier

2017-01-05 00:28 871 查看
CyclicBarrier是java并发的工具类,它允许一组线程互相等待,直到到达某个公共屏障点。因为该barrier 在释放等待线程后可以重用,所以称它为循环的barrier。CyclicBarrier 支持一个可选的 Runnable 命令,在一组线程中的最后一个线程到达之后(但在释放所有线程之前),该命令只在每个屏障点运行一次。若在继续所有参与线程之前更新共享状态。



看一下使用实例

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;

class CBTask extends Thread
{
private CyclicBarrier cb;
public CBTask(CyclicBarrier cb, String name){
this.cb = cb;
setName(name);
}
public void run()
{
System.out.println(getName()+" enter wait ");
try
{
cb.await();
System.out.println(getName()+" continue");
}catch(Exception e){
e.printStackTrace();
}
}
}
public class CyclicBarrierTest
{
public static void main(String[] args)
{
CyclicBarrier cb = new CyclicBarrier(3, new Runnable()
{
public void run()
{
System.out.println(Thread.currentThread().getName()+" game over!!!");
}
});
for(int i=0;  i<3; i++){
Thread t = new CBTask(cb, "once"+i);
t.start();
}
try
{
TimeUnit.MILLISECONDS.sleep(100);
} catch (Exception e)
{
e.printStackTrace();
}
System.out.println("第一次使用完!!!");
for(int i=0; i<3; i++){
Thread t = new CBTask(cb, "twice"+i);
t.start();
}
System.out.println("twice over");
}
}


执行中某一次的结果

once1 enter wait
once0 enter wait
once2 enter wait
once2 game over!!!
once2 continue
once1 continue
once0 continue
第一次使用完!!!
twice0 enter wait
twice over
twice1 enter wait
twice2 enter wait
twice2 game over!!!
twice2 continue
twice1 continue
twice0 continue


看看里面的原理是如何实现的

1.构造方法

public CyclicBarrier(int parties) {
this(parties, null);
}
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}


2.属性

/**保护屏障锁*/
private final ReentrantLock lock = new ReentrantLock();
/** 等待状态 */
private final Condition trip = lock.newCondition();
/** 参数线程数量 */
private final int parties;
/* 最后线程运行的操作 */
private final Runnable barrierCommand;
/**当前这一代的内部类 */
private Generation generation = new Generation();
/**等待的数量*/
private int count;
//静态内部类,broken表示这代是否被打破
private static class Generation {
boolean broken = false;
}


3.dowait方法(await方法调用)

private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//获取当前代
final Generation g = generation;
//是否被打破
if (g.broken)
throw new BrokenBarrierException();
//是否中断
if (Thread.interrupted()) {
//唤醒所有等待,设置为这一代设置为打破状态
breakBarrier();
throw new InterruptedException();
}

int index = --count;//等待线程自减
if (index == 0) {  // tripped
boolean ranAction = false;//设置动作
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();//调用动作
ranAction = true;
nextGeneration();//重置下一代
return 0;
} finally {
if (!ranAction)//出现异常,退出这一代
breakBarrier();
}
}

// 循环直到绊倒, 打破,中断或者超时
for (;;) {
try {
if (!timed)//没有设置时间
trip.await();//等待
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
//不能处理
Thread.currentThread().interrupt();
}
}

if (g.broken)
throw new BrokenBarrierException();

if (g != generation)
return index;

if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}


4.breakBarrier方法

private void breakBarrier() {
generation.broken = true;//设置为打破
count = parties;
trip.signalAll();//唤醒所有等待线程
}


5.signalAll方法

public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;//获取第一个等待节点
if (first != null)
doSignalAll(first);
}


6.doSignalAll方法

private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
//循环将所有的节点取出
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
final boolean transferForSignal(Node node) {
/*
* 如果不能改变状态,节点将会被取消
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;

/*
*将节点设置到AQS等待队列中
*/
Node p = enq(node);
int ws = p.waitStatus;
//如果状态大于0,状态不等于SIGNAL唤醒节点线程
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
//添加节点,加载末尾
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}


7.nextGeneration准备下一代

//重置下一代
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}


8.reset()重置

public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier();   // 打破当前一代
nextGeneration(); //开启新的一代
} finally {
lock.unlock();
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  java 并发