您的位置:首页 > 编程语言 > Java开发

高并发学习笔记(五)

2019-03-26 11:16 85 查看

一、同步器的实现(一)

1. CountDownLatch

    CountDownLatch是jdk1.5之后引入的一个同步辅助类,它可以起到与synchronized类似的效果。它能够使一个或多个线程等待其他线程完成各自的工作之后在执行。通俗的说,CountDownLatch就像是一扇大门上加上多个锁,在大门被打开之前,所有使用CountDownLatch的线程都将被阻塞,但是一旦大门打开那么所有的线程将都能通过,这个被阻塞的状态称为闭锁。并且闭锁是一次性的,即门一旦打开,就不能再锁上了。其用法如下所示:

/**
* CountDownLatch用法示例
* Created by bzhang on 2019/3/16.
*/
public class TestCountDownLatch{
private CountDownLatch latch = new CountDownLatch(5);
public void freedLatch(){
while (latch.getCount()>0){   //还有门闩时,每两秒释放一个
latch.countDown();      //释放一个门闩
System.out.println(Thread.currentThread().getName()
+"释放一个门闩,现在还有"+latch.getCount()+"个门闩");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

public void waitLatch(int i){
System.out.println(Thread.currentThread().getName()+"我先开始了。");
while (i>0){
try {
latch.await();    //阻塞等待门闩释放完,在执行下面的语句
System.out.println("甘霖娘,这么久还没完!");
TimeUnit.SECONDS.sleep(1);
i--;
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}

public static void main(String[] args) {
TestCountDownLatch test = new TestCountDownLatch();
new Thread(new Runnable() {
@Override
public void run() {
test.waitLatch(3);
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
test.freedLatch();
}
}).start();
}
}

    了解了CountDownLatch的用法,再来看看它的源码:

public class CountDownLatch {
//真正实现门闩功能的是sync线程同步队列,继承AbstractQueuedSynchronizer类。
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
setState(count);    //设置线程同步等待队列的状态,当同步队列的状态值<=0时,等待队列中的线程就可以解除阻塞
}

//门闩上锁的数量就是同步队列的状态值
int getCount() {
return getState();
}

//检查锁对象是否可以获取。实现是AQS的共享模式
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;    //当同步队列的状态大于0表示门闩上的锁还没全部解开,继续阻塞等待
}

//countDown方法中实际调用该方法,每调用countDown方法一次就将队列的状态值-1,直到状态值为0
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))    //CAS操作更新状态值
return nextc == 0;
}
}
}

//同步队列,所有等待门闩释放的线程都会被加入一个等待队列中
private final Sync sync;

//构造一个有count把锁的门闩
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);    //内部维护了一个等待队列
}

//门闩上的锁都释放完之前,阻塞等待
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

//在timeout时间内等待门闩上的所释放完,超出不继续等待。TimeUnit为时间单位。
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

//释放一把门闩上的锁
public void countDown() {
sync.releaseShared(1);    //令同步队列的状态值-1
}

//查看门闩上还有几把锁
public long getCount() {
return sync.getCount();
}
}

2. CyclicBarrier

    CyclicBarrier与CountDownLatch有些类似,但CyclicBarrier是在一组线程中所有的线程在同步点(屏障)阻塞,直到所有的线程均到达同步点才解除。可以理解为,有七个人分别去找七龙珠,只有七个人都找到龙珠,才能继续后面召唤神龙的步骤,否则先找到龙珠的人就只能等待其他人找到龙珠。也就是说要么所有线程都成功,要么都失败。

/**
* CyclicBarrier使用示例
* Created by bzhang on 2019/3/16.
*/
public class TestCyclicBarrier {
private CyclicBarrier barrier;

public TestCyclicBarrier() {
barrier = new CyclicBarrier(7, new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"集齐7龙珠啦,可以召唤美少女战士了!");
}
});
}

public void findBall(){
System.out.println(Thread.currentThread().getName()+"正在找龙珠!");
try {
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName()+"找到一颗龙珠!");
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}

public static void main(String[] args) {
TestCyclicBarrier test = new TestCyclicBarrier();
ExecutorService pool = Executors.newFixedThreadPool(7);    //容量为7的线程池
while (true){
for (int i=0;i<7;i++){
pool.submit(new Runnable() {
@Override
public void run() {
test.findBall();
}
});
}
try {
TimeUnit.SECONDS.sleep(3);
System.out.println("一年后。。。。他们又开始找龙珠了");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

    看完示例,同样来看看CyclicBarrier的源码:

public class CyclicBarrier {

//静态内部类,broken表示当前的屏障是否被打破
private static class Generation {
boolean broken = false;
}

//重入锁,与synchronized用法类似,也是AQS的实现,效率高一点,量级轻(后面介绍)
private final ReentrantLock lock = new ReentrantLock();
//用于唤醒被阻塞线程的条件
private final Condition trip = lock.newCondition();
//参与线程数
private final int parties;
//所有线程到达同步点后执行的后续任务,由最后一个到达同步电动额线程执行
private final Runnable barrierCommand;
//当前代,CyclicBarrier是可以重复使用的,当前这一代全部达到同步点后,可以恢复原状在继续使用,即下一代
private Generation generation = new Generation();

//当前等待进入屏障的线程数,即在到达同步点的线程数
private int count;

//当前代使用结束后,进入下一代
private void nextGeneration() {

trip.signalAll();    //唤醒所有线程
count = parties;    //到达同步点的数量恢复有参与线程数
generation = new Generation();    //重新生成一代
}

//Commond执行出现了意外,修改broken的值,然后继续执行被屏障拦截的线程
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}

//执行等待的方法
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();    //加锁
try {
final Generation g = generation;    //获取当前代

if (g.broken)    //判断当前代的屏障是否被打破,打破直接抛异常
throw new BrokenBarrierException();

if (Thread.interrupted()) {    //判断当前线程是否被中断,若被中断就打破屏障唤醒所有线程,在抛异常
breakBarrier();
throw new InterruptedException();
}
//减少到达同步点(屏障)的数量
int index = --count;
if (index == 0) {  //判断是否所有线程都到达同步点(或进入屏障)
boolean ranAction = false;    //command任务的标识
try {
final Runnable command = barrierCommand;
if (command != null)    //判断后续任务command是否为null,不为null就由最后一个到达同步点的线程执行
command.run();
ranAction = true;    //command任务执行成功
nextGeneration();    //开启下一代
return 0;
} finally {
if (!ranAction)    //判断执行command的任务是否成功,失败就打破屏障唤醒所有线程
breakBarrier();
}
}

for (;;) {
try {
if (!timed)    //判断是否设置了等待时间
trip.await();    //未设置则一直等待
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);    //等待指定时间
} catch (InterruptedException ie) {
//判断这一代屏障是否被打破过,若没有打破过就执行打破屏障的并唤醒所有线程的breakBarrier方法,在抛异常
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {

Thread.currentThread().interrupt();
}
}
//判断是否屏障是否被打破
if (g.broken)
throw new BrokenBarrierException();
//判断是否还是当前代
if (g != generation)
return index;
//超时判断
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();    //解锁
}
}

//构造一个参与线程数为parties,进入屏障后的任务为barrierAction的循环屏障
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}

//构造一个参与线程数为parties,进入屏障后的任务为null的循环屏障
public CyclicBarrier(int parties) {
this(parties, null);
}

//获取参与线程数
public int getParties() {
return parties;
}

//所有线程到达同步点之前等待。
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);    //真正实现等待的方法
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}

//所有线程到达同步点之前等待timeout时间
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}

//查看屏障是否打破
public boolean isBroken() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return generation.broken;
} finally {
lock.unlock();
}
}

//重置
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier();   // 打破当前代屏障
nextGeneration(); // 启用下一代屏障
} finally {
lock.unlock();
}
}

//获取等待的线程数
public int getNumberWaiting() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return parties - count;
} finally {
lock.unlock();
}
}
}

    由源码可以看出CyclicBarrier的底层是由重入锁ReentrantLock,这也是jdk1.5新增的量级相对较轻的锁,用法与synchronized类似。下面我们就来看看这重入锁ReentrantLock。

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  JDK