您的位置:首页 > 其它

高并发第十单:J.U.C AQS 组件:CountDownLatch. CyclicBarrier .Semaphore

2018-09-22 22:49 926 查看
这里有一篇介绍AQS的文章 非常好: Java并发之AQS详解

AQS全名:AbstractQueuedSynchronizer,是并发容器J.U.C(java.lang.concurrent)下locks包内的一个类。它实现了一个FIFO(FirstIn、FisrtOut先进先出)的队列。底层实现的数据结构是一个双向列表。

public class CountDownLatchDemo {

private static final int THREAD_COUNT_NUM = 8;
private static CountDownLatch countDownLatch = new CountDownLatch(THREAD_COUNT_NUM);

public static void main(String[] args) throws InterruptedException {

for (int i = 0; i < 100; i++) {
int index = i;
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + "第" + index + "个任务完成!"
+ Thread.currentThread().getName());
// 模拟收集第i个龙珠,随机模拟不同的寻找时间
Thread.sleep(3000);
System.out.println(Thread.currentThread().getName() + "完成:" + System.currentTimeMillis());
} catch (InterruptedException e) {
e.printStackTrace();
}
// 每收集到一颗龙珠,需要等待的颗数减1
countDownLatch.countDown();

}, "我是线程:" + i + ":").start();
;
}
// 等待检查,即上述7个线程执行完毕之后,执行await后边的代码
countDownLatch.await();
System.out.println("所有任务完成!" + System.currentTimeMillis());

}
}

结果
...............
所有任务完成!1537618986091
............
我是线程:39:完成:1537618986097
我是线程:37:完成:1537618986097
我是线程:38:完成:1537618986097
我是线程:35:完成:1537618986096


View Code
所以更加证明了.

CountDownLatch(THREAD_COUNT_NUM); 最多锁住 THREAD_COUNT_NUM 个的线程,其他的线程就按原来的顺序运行了

这个就直接证明了 在await()处,让所有的任务完成了 才能继续主线程

优点:

CountDownLatch的优点毋庸置疑,对使用者而言,你只需要传入一个int型变量控制任务数量即可,至于同步队列的出队入队维护,state变量值的维护对使用者都是透明的,使用方便。

缺点:

CountDownLatch设置了state后就不能更改,也不能循环使用。

2.CyclicBarrier



既然说了 CountDownLatch设置了state后就不能更改,也不能循环使用。那就来个可以循环使用的

举个例子:有四个游戏玩家玩游戏,游戏有三个关卡,每个关卡必须要所有玩家都到达后才能允许通过。其实这个场景里的玩家中如果有玩家A先到了关卡1,他必须等到其他所有玩家都到达关卡1时才能通过,也就是说线程之间需要相互等待。这和CountDownLatch的应用场景有区别,CountDownLatch里的线程是到了运行的目标后继续干自己的其他事情,而这里的线程需要等待其他线程后才能继续完成下面的工作。

案例一: 一起等待

public class CyclicBarrierDemo {

private static CyclicBarrier barrier = new CyclicBarrier(5);

public static void main(String[] args) {

for (int i = 0; i < 10; i++) {
int index = i;
new Thread(() -> {
try {
Thread.sleep(1000);
System.out.println(String.format("我是第%s启动了", index));
barrier.await();
System.out.println(String.format("我是第%s完成了", index));
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}

}, "我是第" + index + "个线程:").start();
}

}
}

结果:
我是第6启动了
.......
我是第0启动了
.......
我是第4完成了了


全部启动,然后一起等待,再继续完成任务

//案例二 最多等待时间

private static void test2() {
for (int i = 0; i < 10; i++) {
int index = i;
new Thread(() -> {
try {
System.out.println(String.format("我是第%s启动了", index));
// 最多阻塞时间
barrier.await(2000, TimeUnit.MILLISECONDS);
System.out.println(String.format("我是第%s完成了", index));
} catch (InterruptedException | BrokenBarrierException | TimeoutException e) {
e.printStackTrace();
barrier.reset();
}

}).start();
}

}


// 还有一个额外的方法是 构造是可以多构造一个Runnable,在计数器的值到达设定值后(但在释放所有线程之前),该Runnable运行一次,注,Runnable在每个屏障点只运行一个

private static CyclicBarrier barrier = new CyclicBarrier(1,()->{
System.out.println("优先执行我");
});

for (int i = 0; i < 2; i++) {
int index = i;
new Thread(() -> {
try {
Thread.sleep(1000);
System.out.println(String.format("我是第%s启动了", index));
barrier.await();
System.out.println(String.format("我是第%s完成了", index));
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}

}, "我是第" + index + "个线程:").start();
}

结果是:
我是第1启动了
我是第0启动了
优先执行自己
优先执行自己
我是第1完成了
我是第0完成了


CyclicBarrier 和 CountDownLatch的比较:


CountDownLatch: 一个线程(或者多个), 等待另外N个线程完成某个事情之后才能执行。--> 反正 你执行完 就ok.不能随意放开

CyclicBarrier: N个线程相互等待,任何一个线程完成之前,所有的线程都必须等待。--> 可以到到某个条件.我放开就行了

CountDownLatch的计数器只能使用一次。而CyclicBarrier的计数器可以使用reset() 方法重置。所以CyclicBarrier能处理更为复杂的业务场景,比如如果计算发生错误,可以重置计数器,并让线程们重新执行一次。

CountDownLatch:减计数方式,CyclicBarrier:加计数方式

3. Semaphore

信号量(Semaphore),有时被称为信号灯,是在多线程环境下使用的一种设施, 它负责协调各个线程, 以保证它们能够正确、合理的使用公共资源。

比喻:

  Semaphore是一件可以容纳N人的房间,如果人不满就可以进去,如果人满了,就要等待有人出来。对于N=1的情况,称为binary semaphore。一般的用法是,用于限制对于某一资源的同时访问

官方一点就是:

用于保证同一时间并发访问线程的数目。

信号量在操作系统中是很重要的概念,Java并发库里的Semaphore就可以很轻松的完成类似操作系统信号量的控制。

Semaphore可以很容易控制系统中某个资源被同时访问的线程个数。 在数据结构中我们学过链表,链表正常是可以保存无限个节点的,而Semaphore可以实现有限大小的列表。

使用场景:仅能提供有限访问的资源。比如数据库连接

上例子:

// 方式 一 直接获取

// 给出10个资源 ,最多保证10个并发
private static final Semaphore SEMAPHORE = new Semaphore(10);
.......
for (int i = 0; i < 100; i++) {
int index = i;
new Thread(() -> {
try {
SEMAPHORE.acquire();// 获取一个许可
System.out.println(String.format("我是线程:%s", index));// 需要并发控制的内容
Thread.sleep(3000);
SEMAPHORE.release(); // 释放一个许可
} catch (InterruptedException e) {
e.printStackTrace();
}

}).start();
}

//结果:
很明显的能看到 10个10的执行


// 方式 二 尝试获取许可,获取不到不执行 很多时候相当于只执行设置的并发量一次

private static final Semaphore SEMAPHORE = new Semaphore(10);

for (int i = 0; i < 100; i++) {
int index = i;
new Thread(() -> {
try {
// 尝试获取许可,获取不到不执行
if(SEMAPHORE.tryAcquire() {
System.out.println(String.format("我是线程:%s", index));// 需要并发控制的内容
Thread.sleep(3000);
SEMAPHORE.release(); // 释放一个许可
}

} catch (InterruptedException e) {
e.printStackTrace();
}

}).start();


// 当时三 有个最长申请时间

private static final Semaphore SEMAPHORE = new Semaphore(10);
for (int i = 0; i < 100; i++) {
int index = i;
new Thread(() -> {
try {
//  尝试获取许可,获取不到不执行 最长申请时间
if(SEMAPHORE.tryAcquire(5000,TimeUnit.MILLISECONDS)) {
System.out.println(String.format("我是线程:%s", index));// 需要并发控制的内容
Thread.sleep(3000);
SEMAPHORE.release(); // 释放一个许可
}

} catch (InterruptedException e) {
e.printStackTrace();
}

}).start();
}


注意:

1 . 其中 构造方法可以加公平锁 :private static final Semaphore SEMAPHORE = new Semaphore(100,true);

2. SEMAPHORE.tryAcquire() => 可以增加获取条件量 SEMAPHORE.tryAcquire(10);释放 SEMAPHORE.release(10);
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: