您的位置:首页 > 编程语言 > Java开发

Java并发Concurrent包的锁(六)——CountDownLatch源码分析及使用

2018-03-08 13:27 615 查看
CountDownLatch 和 CyclicBarrier 比较相似,都属于并发的实用工具类。

CyclicBarrier 的源码分析和使用可以参考:Java并发Concurrent包的锁(五)——CyclicBarrier源码分析及使用

JDK 文档中的定义:

用给定的计数初始化 CountDownLatch。由于调用了 countDown() 方法,所以在当前计数到达零之前,await 方法会一直受阻塞。之后,会释放所有等待的线程,await 的所有后续调用都将立即返回。这种现象只出现一次——计数无法被重置。如果需要重置计数,请考虑使用 CyclicBarrier。

两者的区别:

- CountDownLatch 的作用是允许 1 个(或N个)线程等待其他线程完成执行;而 CyclicBarrier 是允许 N 个线程相互等待;

- CountDownLatch 的计数器无法被重置;CyclicBarrier 的计数器可以被重置后使用,因此它是循环的。

下面通过源码来分析 CountDownLatch 。

Sync类

类中引用了 Sync 类的对象, Sync 类是扩展了 AbstractQueuedSynchronizer 而来:

使用了 AQS 类的 state 值来保存 count 数量。

private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;

// 构造函数,带count值参数
Sync(int count) {
setState(count);
}

// 获取count值
int getCount() {
return getState();
}

// 重写的方法,如果count为0,返回1,即所有线程都到位
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

// 重写的方法,实际是如果有线程到位了,让count减1
protected boolean tryReleaseShared(int releases) {
// 一直循环
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
// 尝试设置state为减1后的值,CAS方法如果为c,才设置新值
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}


属性

// 引用了sync类的对象
private final Sync sync;


构造函数

public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
// 线程数量count,调用Sync类的构造函数
this.sync = new Sync(count);
}


重要方法

await

使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断。

public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}


这里我们看调用的 AQS 类中的对应方法:

public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}


这里的 tryAcquireShared 方法在 sync 类中进行了重写,如果所有线程都到位了,返回 1 。如果 count 不为 0 ,说明还有线程没有到位,需要继续等待。

countDown

countDown 方法调用 sync 重写的 releaseShared 方法,有线程到位了就让未到位的线程数量减 1:

public void countDown() {
sync.releaseShared(1);
}


使用情景

比如还拿 CyclicBarrier 中的文件读取的例子,使用 CountDownLatch 来再重新实现:

屏障类中的那个例子只有 10 个子线程到达屏障后相互等待,现在要加入一个新的统计线程来等待这10个线程。

public class TestCDLatch {

private static CountDownLatch countDownLatch;

public static class SumThread extends Thread {

@Override
public void run() {
System.out.println("需要等待 " + countDownLatch.getCount() + " 个子线程运行完!");
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("其它线程都运行完毕,可以开始统计了");
}

}

public static class CountThread extends Thread {

@Override
public void run() {
System.out.println("子线程" + Thread.currentThread().getName() + "读完了!");
countDownLatch.countDown();
}

}

public static void main(String[] args) {

countDownLatch = new CountDownLatch(10);

new SumThread().start();

for(int i=0; i<10; i++){
new CountThread().start();
}

}

}


运行结果:

需要等待 10 个子线程运行完!
子线程Thread-1读完了!
子线程Thread-2读完了!
子线程Thread-3读完了!
子线程Thread-4读完了!
子线程Thread-5读完了!
子线程Thread-6读完了!
子线程Thread-7读完了!
子线程Thread-8读完了!
子线程Thread-10读完了!
子线程Thread-9读完了!
其它线程都运行完毕,可以开始统计了
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: