Java并发之工具类CyclicBarrier
2017-01-05 00:28
871 查看
CyclicBarrier是java并发的工具类,它允许一组线程互相等待,直到到达某个公共屏障点。因为该barrier 在释放等待线程后可以重用,所以称它为循环的barrier。CyclicBarrier 支持一个可选的 Runnable 命令,在一组线程中的最后一个线程到达之后(但在释放所有线程之前),该命令只在每个屏障点运行一次。若在继续所有参与线程之前更新共享状态。
看一下使用实例
执行中某一次的结果
看看里面的原理是如何实现的
1.构造方法
2.属性
3.dowait方法(await方法调用)
4.breakBarrier方法
5.signalAll方法
6.doSignalAll方法
7.nextGeneration准备下一代
8.reset()重置
看一下使用实例
import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; class CBTask extends Thread { private CyclicBarrier cb; public CBTask(CyclicBarrier cb, String name){ this.cb = cb; setName(name); } public void run() { System.out.println(getName()+" enter wait "); try { cb.await(); System.out.println(getName()+" continue"); }catch(Exception e){ e.printStackTrace(); } } } public class CyclicBarrierTest { public static void main(String[] args) { CyclicBarrier cb = new CyclicBarrier(3, new Runnable() { public void run() { System.out.println(Thread.currentThread().getName()+" game over!!!"); } }); for(int i=0; i<3; i++){ Thread t = new CBTask(cb, "once"+i); t.start(); } try { TimeUnit.MILLISECONDS.sleep(100); } catch (Exception e) { e.printStackTrace(); } System.out.println("第一次使用完!!!"); for(int i=0; i<3; i++){ Thread t = new CBTask(cb, "twice"+i); t.start(); } System.out.println("twice over"); } }
执行中某一次的结果
once1 enter wait once0 enter wait once2 enter wait once2 game over!!! once2 continue once1 continue once0 continue 第一次使用完!!! twice0 enter wait twice over twice1 enter wait twice2 enter wait twice2 game over!!! twice2 continue twice1 continue twice0 continue
看看里面的原理是如何实现的
1.构造方法
public CyclicBarrier(int parties) { this(parties, null); } public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }
2.属性
/**保护屏障锁*/ private final ReentrantLock lock = new ReentrantLock(); /** 等待状态 */ private final Condition trip = lock.newCondition(); /** 参数线程数量 */ private final int parties; /* 最后线程运行的操作 */ private final Runnable barrierCommand; /**当前这一代的内部类 */ private Generation generation = new Generation(); /**等待的数量*/ private int count; //静态内部类,broken表示这代是否被打破 private static class Generation { boolean broken = false; }
3.dowait方法(await方法调用)
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { //获取当前代 final Generation g = generation; //是否被打破 if (g.broken) throw new BrokenBarrierException(); //是否中断 if (Thread.interrupted()) { //唤醒所有等待,设置为这一代设置为打破状态 breakBarrier(); throw new InterruptedException(); } int index = --count;//等待线程自减 if (index == 0) { // tripped boolean ranAction = false;//设置动作 try { final Runnable command = barrierCommand; if (command != null) command.run();//调用动作 ranAction = true; nextGeneration();//重置下一代 return 0; } finally { if (!ranAction)//出现异常,退出这一代 breakBarrier(); } } // 循环直到绊倒, 打破,中断或者超时 for (;;) { try { if (!timed)//没有设置时间 trip.await();//等待 else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { //不能处理 Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }
4.breakBarrier方法
private void breakBarrier() { generation.broken = true;//设置为打破 count = parties; trip.signalAll();//唤醒所有等待线程 }
5.signalAll方法
public final void signalAll() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter;//获取第一个等待节点 if (first != null) doSignalAll(first); }
6.doSignalAll方法
private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; //循环将所有的节点取出 do { Node next = first.nextWaiter; first.nextWaiter = null; transferForSignal(first); first = next; } while (first != null); } final boolean transferForSignal(Node node) { /* * 如果不能改变状态,节点将会被取消 */ if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; /* *将节点设置到AQS等待队列中 */ Node p = enq(node); int ws = p.waitStatus; //如果状态大于0,状态不等于SIGNAL唤醒节点线程 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; } //添加节点,加载末尾 private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
7.nextGeneration准备下一代
//重置下一代 private void nextGeneration() { // signal completion of last generation trip.signalAll(); // set up next generation count = parties; generation = new Generation(); }
8.reset()重置
public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { breakBarrier(); // 打破当前一代 nextGeneration(); //开启新的一代 } finally { lock.unlock(); } }
相关文章推荐
- 第8章 Java中的并发工具类(CountDownLatch CyclicBarrier Semaphore Exchanger)
- Java并发工具类之CyclicBarrier
- 【死磕Java并发】—- J.U.C之并发工具类:CyclicBarrier
- 【死磕Java并发】—- J.U.C之并发工具类:CyclicBarrier
- java高并发程序设计总结五:jdk并发包其他同步控制工具类:ReadWriteLock/CountDownLatch/CyclicBarrier/LockSupport
- 【死磕Java并发】—- J.U.C之并发工具类:CyclicBarrier
- 【死磕Java并发】—- J.U.C之并发工具类:CyclicBarrier
- 【死磕Java并发】—- J.U.C之并发工具类:CyclicBarrier
- Java并发之CyclicBarrier 可重用同步工具类
- Java并发编程之2——同步工具类的使用(CountDownLatch,CyclicBarrier,BlockungQueue,Semaphore)
- java多线程解说【拾陆】_并发工具类:CyclicBarrier
- java 并发工具类CountDownLatch & CyclicBarrier
- Java中的并发工具类:CountDownLatch、CyclicBarrier和Semaphore
- Java并发:同步工具类详解(CountDownLatch、CyclicBarrier、Semaphore)
- java并发中的协同工具类介绍-CountDownLatch-CyclicBarrier-Semphone-Exchanger
- 【死磕Java并发】—- J.U.C之并发工具类:CyclicBarrier
- 【死磕Java并发】—- J.U.C之并发工具类:CyclicBarrier
- 【死磕Java并发】—- J.U.C之并发工具类:CyclicBarrier
- Java多线程-并发工具类(一)同步屏障CyclicBarrier