您的位置:首页 > 编程语言 > Java开发

java高并发、多线程(七)--CountDownLatch

2020-06-11 04:23 459 查看

java高并发、多线程(七)

CountDownLatch

一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。

一个简单的例子

public class CountDownLatchTest {
public static void main(String[] args) {
List<Thread> list = new ArrayList<>();
CountDownLatch countDownLatch = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
final int num = i;
list.add(new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Thread-" + num + "执行结束");
countDownLatch.countDown();
}
},"Thread-"+i));
}
list.forEach(t -> t.start());
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("程序执行结束");
}
}

执行结果:

从执行结果可以看出,待前面10个线程执行结束后,main才最终继续执行完成。
下面我们来看一看CountDownLatch 源码是怎么实现的。

源码

CountDownLatch 主要结构:

public class CountDownLatch {

private static final class Sync extends AbstractQueuedSynchronizer {/**...*/}

private final Sync sync;

public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
}

从上面源码可以看到,CountDownLatch 内包含一个实现AQS的内部类Sync ,构造方法主要是创建一个Sync对象赋值到sync属性上。
下面我们来看看Sync类:

Sync类

private static final class Sync extends AbstractQueuedSynchronizer {
//构造方法
Sync(int count) {
//设置state
setState(count);
}
//获取state
int getCount() {
return getState();
}
//尝试以共享模式获取锁
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
//尝试以共享模式释放锁,也就是state - 1
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
//CAS设置state
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}

通过以上源码,我们可以知道CountDownLatch 主要是通过state的值来控制线程。
我们具体再来梳理一下文初的事例是如何运行的:

CountDownLatch countDownLatch = new CountDownLatch(10)

1、创建CountDownLatch对象,根据源码,我们可以知道此时对象的state设置为10。
2、创建10个线程,并添加进list。
3、启动10个线程,每个线程将执行countDownLatch.countDown();

//CountDownLatch类方法
public void countDown() {
//释放共享锁
sync.releaseShared(1);
}
//AQS类方法
public final boolean releaseShared(int arg) {
//尝试释放共享锁,state-1
if (tryReleaseShared(arg)) {
//唤醒后续节点
doReleaseShared();
return true;
}
return false;
}

4、执行countDownLatch.await(),使主线程在锁存器倒计数至零之前一直等待,除非线程被中断

public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//state不为0时
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//以共享模式入等待队列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
//获取前驱节点
final Node p = node.predecessor();
//前驱节点为头节点
if (p == head) {
//尝试获取锁
int r = tryAcquireShared(arg);
//state为0
if (r >= 0) {
//设置头节点,并唤醒后续节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//若还有线程处于阻塞状态
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
//唤醒后续节点
doReleaseShared();
}
}

从上述代码可以知道,在创建countDownLatch对象时将注册state,每个线程执行countDown()后,state将会减1,若state不为0,await()方法将会阻塞直至state为0,从而实现上述功能。

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: