您的位置:首页 > 编程语言 > Java开发

java并发编程——八 理解分析并发组件-CountDownLatch\CyclicBarrier\Exchanger\Semaphore

2016-03-26 21:02 881 查看
从这章开始介绍 java.concurrent.*中的常用并发工具类,并附带有趣的示例

CountDownLatch

概要

按照需要同时阻塞的线程个数count,初始化对象。

CountDownLatch countDownLatch = new CountDownLatch(count);

每次调用countDownLatch.countDown(), count计数器-1。

countDownLatch.await()阻塞当前线程,直到count值为0恢复运行。

适合用在多个线程协助;或用在多个点或多个线程任务之间协作,比如:

使用CountDownLatch对象,并设置一个初始值,

如 CountDownLatch countDownLatch = new CountDownLatch(5).

在Task F中调用countDownLatch.await()阻塞当前线程,直到count值为0时,自动运行。

Task A….Task E ,这5个任务中,调用countDownLatch.countDown(),表示每完成一个任务就将count值减一.

Task A…E运行完之后,Task F将不再阻塞,立刻运行。

以上描述对应代码实现:

class PortionTask implements Runnable {

private static int counter = 0;
private int id = counter++;
private CountDownLatch countDownLatch;

public PortionTask(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}

@Override
public void run() {
System.out.println("portion task complete " + this + "   Thread[" + Thread.currentThread().getName() + "]");
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
countDownLatch.countDown();
}

@Override
public String toString() {
return "PortionTask [id=" + id + "]";
}

}

class TaskWaiting implements Runnable {

private CountDownLatch countDownLatch;

public TaskWaiting(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}

@Override
public void run() {
try {
countDownLatch.await();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("it's my turn " + this);
}

}

public class CountDownLatchDemoMine {
public static void main(String[] args) {
int SIZE = 5;

CountDownLatch c = new CountDownLatch(SIZE);

ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new PortionTask(c));// A Task
exec.execute(new PortionTask(c));// B Task
exec.execute(new PortionTask(c));// C Task
exec.execute(new PortionTask(c));// D Task
exec.execute(new PortionTask(c));// E Task
exec.execute(new TaskWaiting(c));// Final Task F
exec.shutdown();
}
}


CountDownLatch源码分析

1.初始化

CountDownLatch countDownLatch = new CountDownLatch(count);

//从锁的角度理解,CountDownLatch初始化完成后,就有count把锁占用
//CountDownLatch

public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);//对(AQS)sync同步器初始化
}

private static final class Sync extends AbstractQueuedSynchronizer {

Sync(int count) {//入参代表锁state的数值
setState(count);// volatile set
}
............


2.CountDownLatch中实现的同步器(extends AQS)

CountDownLatch也是通过AQS来实现的,我们先来看下这个AQS的内部实现类:

//CountDownLatch.Sync
/**
* Synchronization control For CountDownLatch.
* Uses AQS state to represent count.
*/
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;

Sync(int count) {//入参代表锁state的数值
setState(count);
}

int getCount() {
return getState();
}

//使用了共享模式锁,当state==0时表示锁空闲.
// -1获取锁失败 1 获取成功
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

//共享模式释放锁,尝试获取获取锁
//false:释放共享锁失败
//true:调用AQS方法唤醒sync的后继等待结点
protected boolean tryReleaseShared(int releases) {
// count递减,直到为0后唤醒阻塞的线程
for (;;) {
int c = getState();
if (c == 0)
return false;//锁已经被释放
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;//表是锁已经被完全释放
}
}
}


3.countDownLatch.await() 阻塞。因为由共享锁实现,所以这方法可以被多次调用,也就是说可以重复调用这个方法阻塞多个线程

//CountDownLatch
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);//共享式 可中断获取锁
}


AbstractQueuedSynchronizer.java 详细分析AQS请点击查看之前文章

public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())//响应中断
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)//上文已述。state!=0的时候都会返回true
//doAcquireSharedInterruptibly主要逻辑(详细请参考AQS文章):
//1.构造当前线程为共享结点;
//2.加入到sync队列尾部;
//3.再次尝试获取共享锁,成功则传播式唤醒所有后继共享结点。
//4.获取锁失败park阻塞,直到被前驱唤醒。
doAcquireSharedInterruptibly(arg);
}


//CountDownLatch.Sync

//使用了共享模式锁,当state==0时表示锁空闲。那么通常new CountDownLatch(count)初始化后state==count,返回-1。
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}


使用AQS完成了锁的获取,如果获取锁失败也就是说countDownLatch计数器不为0,那么获取锁失败,以共享模式进入sync队列中等待(park)。

也就是通常new CountDownLatch(count)初始化时,默认代表了count把锁被占用,直到countDown()多次被调用,count==0,各个阻塞线程才能获取锁,从阻塞状态被唤醒。

await()总结

初始化CountDownLatch countDownLatch = new CountDownLatch(count),设置了state同步状态==count,那么每次一个线程中调用countDownLatch.await(),就会执行tryAcquireShared(),因为state!=0所以返回-1,继续执行AQS父类的doAcquireSharedInterruptibly(),当前结点将被构造为一个共享结点,放入Sync同步队里中进行阻塞。

3.countDownLatch.countDown()

/**
* Decrements the count of the latch, releasing all waiting threads if
* the count reaches zero.
*
* <p>If the current count is greater than zero then it is decremented.
* If the new count is zero then all waiting threads are re-enabled for
* thread scheduling purposes.
*
* <p>If the current count equals zero then nothing happens.
* 每次一个线程执行countDown(),state都会-1.
*/
public void countDown() {
sync.releaseShared(1);
}


——>

//AbstractQueuedSynchronizer.releaseShared:
/**
* Releases in shared mode.  Implemented by unblocking one or more
* threads if {@link #tryReleaseShared} returns true.
*
* @param arg the release argument.  This value is conveyed to
*        {@link #tryReleaseShared} but is otherwise uninterpreted
*        and can represent anything you like.
* @return the value returned from {@link #tryReleaseShared}
*/
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();//state--,当state==0时执行这个方法,执行AQS的doReleaseShared(),所有Sync队列中的共享结点将被依次唤醒。
return true;
}
return false;
}


//CountDownLatch.Sync.tryReleaseShared(1)

//共享模式释放锁,尝试获取获取锁
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;//锁已经被释放
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;//true:state到达0.调用AQS方法唤醒sync的后继等待结点
}
}
}


当nextc==0表示锁释放处于空闲状态,进入AQS中doReleaseShared()唤醒后继结点(await的线程构造的结点)。

如果对AQS不清楚,请先详解请查看AQS

countDown()总结

多个线程调用了countDownLatch.await()将当前线程阻塞,直到执行countDownLatch.countDown()使state做减法操作,直到state==0时,表示当前线程获取到了锁。

当前线程将进入aqs.doReleaseShared()方法中,唤醒头结点(也就是当前线程代表的结点)的后继结点,后继结点如果都是共享模式的结点,那么会依次唤醒自己的共享后继结点。

所有的await线程将被唤醒。被唤醒的线程继续获取锁,因为CountDownLatch.tryAcquireShared(..)的实现可以保证多个线程获取一把锁,所以后继结点将都获取锁,接着执行doReleaseShared()中的unparkSuccessor,保证了所有共享结点都被立马唤醒。

然后所有之前await的线程,都将从Sync队列中被摘下,从await()上返回继续执行最上层代码。

CountDownLatch 整体总结

概述:通过内部state计数的方式,协调多个线程的阻塞,并使它们在某一时刻同时唤醒。

CountDownLatch 内部定义了一个AQS的子类,通过使用AQS中的方法

tryReleaseShared\tryAcquireShared\releaseShared\doAcquireSharedInterruptibly方法,构造了一个共享锁的组件。

初始化时,通过传入参数,初始化了aqs state的数值。

await() 某个线程调用await,因为state!=0都将被park阻塞,被构造为共享结点进入Sync队列排队。

countDown() 每次调用,state数值都将-1,直到某个线程-1后state==0。接着进入aqs.releaseShared方法释放锁,传播式唤醒后继结点中的所有共享结点(就是所有await等待的线程结点,将会从await方法上返回)

CyclicBarrier

直接上代码,一个很多有趣的短跑比赛模拟程序,详见注释,看完你就理解了CyclicBarrier:

class Player implements Runnable {

private static int counter = 0;
private int id = counter++;
private int moveDistance;
private static CyclicBarrier barrier;
private static Random random = new Random(47);

public Player(CyclicBarrier b) {
barrier = b;
}

public int getMoveDistance() {
return moveDistance;
}

@Override
public void run() {
while (!Thread.interrupted()) {
moveDistance += random.nextInt(3);
try {
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}

@Override
public String toString() {
return "" + id;
}

}

public class Race {
private int playerNum = 5;
private int finalLine = 20;
private CyclicBarrier barrier;// 唯一的CyclicBarrier对象

private List<Player> players = new ArrayList<Player>();
private ExecutorService es = Executors.newCachedThreadPool();

public Race() {

// 1 构建CyclicBarrier(int parties, Runnable barrierAction)
// 当n个player都在这个barrier上调用了barrier.await(),这个barrier
// 对象将重置,并使用最后一个调用await的线程去运行barrierAction,
// 唤醒所有wait的线程,
// 然后再一次等待nHourse全部执行await....周而复始;直到shutdownNow
// 查看CyclicBarrier构造器的JDoc,有详细描述
barrier = new CyclicBarrier(playerNum, new Runnable() {
@Override
public void run() {
// 1.打印赛道
StringBuffer raceLine = new StringBuffer();
raceLine.append(" ");
for (int i = 0; i < finalLine; i++) {
raceLine.append("=");
}
System.out.println(" 100米男子赛道");
System.out.println(raceLine);

// 2.打印当前参赛者行进位置
for (Player player : players) {
StringBuffer playerLocation = new StringBuffer();
playerLocation.append(player);
for (int i = 0; i < player.getMoveDistance(); i++) {
playerLocation.append("*");
}
System.out.println(playerLocation);

}
// 3.检查是否有参赛者完成比赛,如果是,结束比赛;
// (会存在多个player同时越过终点)
for (Player player : players) {
if (player.getMoveDistance() >= finalLine) {
System.out.println("bravo: player" + player + " won!!! ");
es.shutdownNow();// 该方法并不保证池中所有线程退出
return;
}

}
try {
TimeUnit.MILLISECONDS.sleep(80);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
// 2.任务执行:所有player开始跑
for (int i = 0; i < playerNum; i++) {
// 每个Player中传入同一个CyclicBarrier对象
Player p = new Player(barrier);
players.add(p);
es.execute(p);
}
}

public static void main(String[] args) {
new Race();

}

}


output:

100米男子赛道
==========
0**
1**
2*
3**
4*

..............................

100米男子赛道
==========
0*******
1*********
2*****
3******
4******
100米男子赛道
==========
0********
1**********
2*****
3*******
4******
bravo: player1 won!!!


CyclicBarrier源码分析

1.构造 内部组合了ReentrantLock以及Condition

//CyclicBarrier
/** The lock for guarding barrier entry */
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
private final Condition trip = lock.newCondition();
/**  需要await的次数*/
private final int parties;
/* 当逃脱发生时(也就是parties次调用了这个await()),执行自定义的任务 */
private final Runnable barrierCommand;
/** The current generation */
private Generation generation = new Generation();

/**
* 参与者(等待者)的数量。
* 当count达到0,则count会被重置为初始值parties
*
*因为使用Condition的前提是已经获取到了RenntrantLock的锁,所以这个变量不需要使用volatile修饰
*/
private int count;

public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}

public CyclicBarrier(int parties) {
this(parties, null);
}


2.cyclicBarrier.await() 当前线程阻塞

public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}

private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();//获取ReentrantLock锁
try {
final Generation g = generation;

if (g.broken)//默认false
throw new BrokenBarrierException();

if (Thread.interrupted()) {//对中断响应
breakBarrier();//当前线程中断;signalAll同时唤醒其他等待者
throw new InterruptedException();
}

int index = --count;//在await()中执行count--.(与CountDownLatch)
if (index == 0) {  // tripped 跳脱。parites次调用了await。那么index==0
boolean ranAction = false;
try {
/* 当逃脱发生时(也就是parties次调用了这个await()),可以直接执行自定义的任务 */
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();//主要逻辑:signalAll();重置count
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}

// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();//使用Condition.await 阻塞
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}

if (g.broken)//响应中断抛出异常
throw new BrokenBarrierException();

if (g != generation)//count已经重置了,新的一轮已经开始
return index;

if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
/**
* Sets current barrier generation as broken and wakes up everyone.
* Called only while holding lock.
*/
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}

private void nextGeneration() {
// 唤醒所有在这个condition上park等待的线程.也就是将这些线程对应的结点放入sync队列中。
trip.signalAll();
// set up next generation
count = parties;//重置count
generation = new Generation();
}


小结

内部组合了ReentrantLock的Condition做实现。

每次调用CyclicBarrier.await(),内部计数器会-1,然后阻塞在condition.await()

直到最后一个线程调用CyclicBarrier.await,计数器达到0:唤醒所有等待的线程(condition.notifyALL),同时将计数器置为初始化状态以便下次继续使用。

CountDownLatch vs CyclicBarrier

内部实现:前者内部基于共享模式实现了AQS的子类。后者直接使用ReentrantLock类、Condition类以及一个计数器实现线程之间的同步。

功能:

CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重

置。所以CyclicBarrier能处理更为复杂的业务场景。例如,如果计算发生错误,可以重置计数

器,并让线程重新执行一次。另外,CyclicBarrier的await方法中会自动-1,而CountDownLatch中需要调用countDown().

Exchanger

示例描述:

一个生产者线程生产消息 msgX,然后调用exchanger.exchang(msgX),此时该线程Blocked,等待另一个线程(消费者)去消费:消费者调用 exchanger.exchange(“此参数会传递给生产者线程,并不需要”),获得生成者刚才生成的消息,并唤醒阻塞的生产者.

public class ExchangerTest {

private static final Exchanger<List<String>> exgr = new Exchanger<List<String>>();

private static ExecutorService threadPool = Executors.newFixedThreadPool(10);

private static Random rand = new Random(47);

private static Runnable producerProductMsg() {
return new Runnable() {

@Override
public void run() {
List<String> producerMsgX = new ArrayList<String>();
for (int i = 0; i < 10; i++) {
producerMsgX.add(i + rand.nextInt(100) + "");
}
try {
// 生产,阻塞
// Waits for another thread to arrive at this exchange
// point
exgr.exchange(producerMsgX);
System.out.println("    ThreadName:" + Thread.currentThread().getName() + " producerProductMsg:"
+ producerMsgX);

} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}

};
}

private static Runnable consumerConsumeMsg() {
return new Runnable() {
@Override
public void run() {
try {
List<String> getMsgFromProducer = exgr.exchange(new ArrayList<String>());
System.out.println("    ThreadName:" + Thread.currentThread().getName() + " consumerConsumeMsg:"
+ getMsgFromProducer);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}

};
}

public static void main(String[] args) {

// 消息生成者
threadPool.execute(producerProductMsg());
// 消费者 消费
threadPool.execute(consumerConsumeMsg());
threadPool.shutdown();
}
}


Semaphore

要点

控制使用资源的并发数量

对象池实现示例

class Fat {
private volatile double d;
private static int counter = 0;
private int id = counter++;

public Fat() {
for (int i = 0; i < 10000; i++) {
d += Math.PI + Math.E;
}
}

public void operation() {
System.out.println("opreation:" + this);
}

@Override
public String toString() {
return "Fat [id=" + id + "]";
}

}

public class Pool<T> {

private int size;
private List<T> items = new ArrayList<T>();
private volatile boolean[] checkOut;
private Semaphore available;

public Pool(Class<T> classObj, int size) {
this.size = size;
checkOut = new boolean[size];
available = new Semaphore(size, true);

// 初始化对象池
for (int i = 0; i < size; i++) {
try {
items.add(classObj.newInstance());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

private synchronized T getItem() {
for (int i = 0; i < size; i++) {
if (!checkOut[i]) {// 未被取出,那么取出使用
checkOut[i] = true;
return items.get(i);
}

}
return null;// Semaphore 阻止了对象池被使用空的情况
}

private synchronized boolean releaseItem(T x) {
int index = items.indexOf(x);
if (-1 == index) {
return false;// 该对象x并不在池中
}
if (checkOut[index]) {// 该对象不在池中
checkOut[index] = false;
return true;
}
return false;// 对象未被从池中取出
}

public T checkOut() throws InterruptedException {
available.acquire();
return getItem();
}

public void checkIn(T x) {
if (releaseItem(x)) {
available.release();
}
}

}


class CheckoutTast<T> implements Runnable {
private static int counter = 0;
private int id = counter++;
private Pool<T> objPool;

public CheckoutTast(Pool<T> objPool) {
this.objPool = objPool;
}

@Override
public void run() {
try {
T item = objPool.checkOut();
System.out.println(this + " checked out " + item);
TimeUnit.SECONDS.sleep(1);
System.out.println(this + " checked in " + item);
objPool.checkIn(item);
} catch (Exception e) {
e.printStackTrace();
}

}

@Override
public String toString() {
return "CheckoutTast [id=" + id + "]";
}

}

public class SemaphoreDemo {
final static int SIZE = 5;

public static void main(String[] args) throws InterruptedException {
final Pool<Fat> pool = new Pool<Fat>(Fat.class, SIZE);
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < SIZE; i++) {
exec.execute(new CheckoutTast<Fat>(pool));
}

System.out.println("All checkout Tasks were created,size:" + SIZE);
List<Fat> list = new ArrayList<Fat>();
// 取出池中所有的对象
for (int i = 0; i < SIZE; i++) {
Fat f = pool.checkOut();
System.out.println(">>>>" + i + "[main Thread checked out]:" + f);
f.operation();
list.add(f);
}
// 池中没有客用对象,由于使用了Semaphore,阻塞
Future<?> blocked = exec.submit(new Runnable() {
@Override
public void run() {
// Semaphore 会阻塞超额的CheckOut
try {
pool.checkOut();
} catch (InterruptedException e) {
System.out.println("blocked interrupted");
}
}
});

TimeUnit.SECONDS.sleep(5);
blocked.cancel(true);
System.out.println();

// 返回池中所有对象
for (Fat f : list) {
pool.checkIn(f);
}
for (Fat f : list) {// 此时checkIn总是返回False,冗余checkIn被忽略
pool.checkIn(f);
}

exec.shutdown();
}
}


Semaphore源码分析

通常用来控制并发访问资源的线程数量,比如连接池。

aqs共享模式实现的共享锁(还有一个是ReentrantReadAndWriteLock)。

可用于对某一资源的限量访问,超过某阈值阻塞后边的进程进入Sync排队阻塞。

同样是使用AQS实现,同时实现了公平与非公平模式(类似ReentrantLock)。源码与之前都很类似,很简单,理解的关键是理解AQS

public Semaphore(int permits) {
sync = new NonfairSync(permits);//初始化。默认使用非公平锁
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

//AQS实现 AQS——>Sync——>NonfairSync\FairSync

abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;

Sync(int permits) {
setState(permits);// 初始化state
}

final int getPermits() {
return getState();
}

// 非公平模式 获取共享锁
final int nonfairTryAcquireShared(int acquires) {
for (;;) {// 自旋
int available = getState();
int remaining = available - acquires;// (允许进入的线程)剩余个数
if (remaining < 0 || compareAndSetState(available, remaining))// 没有达到阈值并且CAS获取锁状态成功
return remaining;// 返回剩余个数
// 如果位负数表示已经达到阈值,会阻塞当前线程进入AQSsync队列排队
}
}

protected final boolean tryReleaseShared(int releases) {
for (;;) {// 自旋
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;// 唤醒后继等待结点
}
}

final void reducePermits(int reductions) {
for (;;) {// 自旋
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}

final int drainPermits() {
for (;;) {// 自旋
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}

static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;

NonfairSync(int permits) {
super(permits);
}

protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);// 通过自旋方式获取锁,如果达到阈值,那么waiting
}
}

/**
* Fair version
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;

FairSync(int permits) {
super(permits);
}

protected int tryAcquireShared(int acquires) {
for (;;) {// 通过自旋方式获取锁,无waiting
if (hasQueuedPredecessors())// 保证公平性
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 || compareAndSetState(available, remaining))
return remaining;
}
}
}


使用AQS构造了共享模式的锁,不支持重入。当一个线程调用semaphore.acquire回去一把锁,当获取的次数达到阈值,则后续的获取线程会被阻塞进入Sync队列等待,直到某一个线程释放后进入。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: