高并发学习笔记(五)
2019-03-26 11:16
85 查看
一、同步器的实现(一)
1. CountDownLatch
CountDownLatch是jdk1.5之后引入的一个同步辅助类,它可以起到与synchronized类似的效果。它能够使一个或多个线程等待其他线程完成各自的工作之后在执行。通俗的说,CountDownLatch就像是一扇大门上加上多个锁,在大门被打开之前,所有使用CountDownLatch的线程都将被阻塞,但是一旦大门打开那么所有的线程将都能通过,这个被阻塞的状态称为闭锁。并且闭锁是一次性的,即门一旦打开,就不能再锁上了。其用法如下所示:
/** * CountDownLatch用法示例 * Created by bzhang on 2019/3/16. */ public class TestCountDownLatch{ private CountDownLatch latch = new CountDownLatch(5); public void freedLatch(){ while (latch.getCount()>0){ //还有门闩时,每两秒释放一个 latch.countDown(); //释放一个门闩 System.out.println(Thread.currentThread().getName() +"释放一个门闩,现在还有"+latch.getCount()+"个门闩"); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } } } public void waitLatch(int i){ System.out.println(Thread.currentThread().getName()+"我先开始了。"); while (i>0){ try { latch.await(); //阻塞等待门闩释放完,在执行下面的语句 System.out.println("甘霖娘,这么久还没完!"); TimeUnit.SECONDS.sleep(1); i--; } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { TestCountDownLatch test = new TestCountDownLatch(); new Thread(new Runnable() { @Override public void run() { test.waitLatch(3); } }).start(); new Thread(new Runnable() { @Override public void run() { test.freedLatch(); } }).start(); } }
了解了CountDownLatch的用法,再来看看它的源码:
public class CountDownLatch { //真正实现门闩功能的是sync线程同步队列,继承AbstractQueuedSynchronizer类。 private static final class Sync extends AbstractQueuedSynchronizer { Sync(int count) { setState(count); //设置线程同步等待队列的状态,当同步队列的状态值<=0时,等待队列中的线程就可以解除阻塞 } //门闩上锁的数量就是同步队列的状态值 int getCount() { return getState(); } //检查锁对象是否可以获取。实现是AQS的共享模式 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; //当同步队列的状态大于0表示门闩上的锁还没全部解开,继续阻塞等待 } //countDown方法中实际调用该方法,每调用countDown方法一次就将队列的状态值-1,直到状态值为0 protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) //CAS操作更新状态值 return nextc == 0; } } } //同步队列,所有等待门闩释放的线程都会被加入一个等待队列中 private final Sync sync; //构造一个有count把锁的门闩 public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); //内部维护了一个等待队列 } //门闩上的锁都释放完之前,阻塞等待 public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } //在timeout时间内等待门闩上的所释放完,超出不继续等待。TimeUnit为时间单位。 public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } //释放一把门闩上的锁 public void countDown() { sync.releaseShared(1); //令同步队列的状态值-1 } //查看门闩上还有几把锁 public long getCount() { return sync.getCount(); } }
2. CyclicBarrier
CyclicBarrier与CountDownLatch有些类似,但CyclicBarrier是在一组线程中所有的线程在同步点(屏障)阻塞,直到所有的线程均到达同步点才解除。可以理解为,有七个人分别去找七龙珠,只有七个人都找到龙珠,才能继续后面召唤神龙的步骤,否则先找到龙珠的人就只能等待其他人找到龙珠。也就是说要么所有线程都成功,要么都失败。
/** * CyclicBarrier使用示例 * Created by bzhang on 2019/3/16. */ public class TestCyclicBarrier { private CyclicBarrier barrier; public TestCyclicBarrier() { barrier = new CyclicBarrier(7, new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName()+"集齐7龙珠啦,可以召唤美少女战士了!"); } }); } public void findBall(){ System.out.println(Thread.currentThread().getName()+"正在找龙珠!"); try { TimeUnit.SECONDS.sleep(1); System.out.println(Thread.currentThread().getName()+"找到一颗龙珠!"); barrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } public static void main(String[] args) { TestCyclicBarrier test = new TestCyclicBarrier(); ExecutorService pool = Executors.newFixedThreadPool(7); //容量为7的线程池 while (true){ for (int i=0;i<7;i++){ pool.submit(new Runnable() { @Override public void run() { test.findBall(); } }); } try { TimeUnit.SECONDS.sleep(3); System.out.println("一年后。。。。他们又开始找龙珠了"); } catch (InterruptedException e) { e.printStackTrace(); } } } }
看完示例,同样来看看CyclicBarrier的源码:
public class CyclicBarrier { //静态内部类,broken表示当前的屏障是否被打破 private static class Generation { boolean broken = false; } //重入锁,与synchronized用法类似,也是AQS的实现,效率高一点,量级轻(后面介绍) private final ReentrantLock lock = new ReentrantLock(); //用于唤醒被阻塞线程的条件 private final Condition trip = lock.newCondition(); //参与线程数 private final int parties; //所有线程到达同步点后执行的后续任务,由最后一个到达同步电动额线程执行 private final Runnable barrierCommand; //当前代,CyclicBarrier是可以重复使用的,当前这一代全部达到同步点后,可以恢复原状在继续使用,即下一代 private Generation generation = new Generation(); //当前等待进入屏障的线程数,即在到达同步点的线程数 private int count; //当前代使用结束后,进入下一代 private void nextGeneration() { trip.signalAll(); //唤醒所有线程 count = parties; //到达同步点的数量恢复有参与线程数 generation = new Generation(); //重新生成一代 } //Commond执行出现了意外,修改broken的值,然后继续执行被屏障拦截的线程 private void breakBarrier() { generation.broken = true; count = parties; trip.signalAll(); } //执行等待的方法 private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); //加锁 try { final Generation g = generation; //获取当前代 if (g.broken) //判断当前代的屏障是否被打破,打破直接抛异常 throw new BrokenBarrierException(); if (Thread.interrupted()) { //判断当前线程是否被中断,若被中断就打破屏障唤醒所有线程,在抛异常 breakBarrier(); throw new InterruptedException(); } //减少到达同步点(屏障)的数量 int index = --count; if (index == 0) { //判断是否所有线程都到达同步点(或进入屏障) boolean ranAction = false; //command任务的标识 try { final Runnable command = barrierCommand; if (command != null) //判断后续任务command是否为null,不为null就由最后一个到达同步点的线程执行 command.run(); ranAction = true; //command任务执行成功 nextGeneration(); //开启下一代 return 0; } finally { if (!ranAction) //判断执行command的任务是否成功,失败就打破屏障唤醒所有线程 breakBarrier(); } } for (;;) { try { if (!timed) //判断是否设置了等待时间 trip.await(); //未设置则一直等待 else if (nanos > 0L) nanos = trip.awaitNanos(nanos); //等待指定时间 } catch (InterruptedException ie) { //判断这一代屏障是否被打破过,若没有打破过就执行打破屏障的并唤醒所有线程的breakBarrier方法,在抛异常 if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { Thread.currentThread().interrupt(); } } //判断是否屏障是否被打破 if (g.broken) throw new BrokenBarrierException(); //判断是否还是当前代 if (g != generation) return index; //超时判断 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); //解锁 } } //构造一个参与线程数为parties,进入屏障后的任务为barrierAction的循环屏障 public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } //构造一个参与线程数为parties,进入屏障后的任务为null的循环屏障 public CyclicBarrier(int parties) { this(parties, null); } //获取参与线程数 public int getParties() { return parties; } //所有线程到达同步点之前等待。 public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); //真正实现等待的方法 } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } } //所有线程到达同步点之前等待timeout时间 public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true, unit.toNanos(timeout)); } //查看屏障是否打破 public boolean isBroken() { final ReentrantLock lock = this.lock; lock.lock(); try { return generation.broken; } finally { lock.unlock(); } } //重置 public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { breakBarrier(); // 打破当前代屏障 nextGeneration(); // 启用下一代屏障 } finally { lock.unlock(); } } //获取等待的线程数 public int getNumberWaiting() { final ReentrantLock lock = this.lock; lock.lock(); try { return parties - count; } finally { lock.unlock(); } } }
由源码可以看出CyclicBarrier的底层是由重入锁ReentrantLock,这也是jdk1.5新增的量级相对较轻的锁,用法与synchronized类似。下面我们就来看看这重入锁ReentrantLock。
相关文章推荐
- Go语言并发与并行学习笔记(一)
- 学习笔记(九)并发(三)
- Go语言并发与并行学习笔记(一)
- Java并发学习笔记(八)-LinkedBlockingQueue
- Go语言学习笔记-并发
- 并发编程学习笔记之Lock与synchronized
- Java并发学习笔记(15)信号量(Semaphore) 关卡((2)CyclicBarrier)
- 学习笔记:java并发编程学习之初识Concurrent
- JAVA并发编程学习笔记之ReentrantLock (r)
- MySQL学习笔记之四:并发控制和事务机制
- Java并发编程学习笔记 深入理解volatile关键字的作用
- 学习JAVA多线程编程 --- 《JAVA多线程编程核心技术》第2章 对象及变量的并发访问 笔记
- WCF学习笔记之并发与限流
- Java并发读书学习笔记(九)——性能与可伸缩性
- Java并发读书学习笔记(十)——显式锁
- Java学习笔记—多线程(并发工具类)
- Java并发读书学习笔记(十一)——原子变量与非阻塞同步机制
- 并发编程实战学习笔记(七)——避免活跃性问题
- 并发编程实战学习笔记(八)——性能与可伸缩性
- 并发学习笔记(三):join与wait/notify