java并发编程——八 理解分析并发组件-CountDownLatch\CyclicBarrier\Exchanger\Semaphore
2016-03-26 21:02
881 查看
从这章开始介绍 java.concurrent.*中的常用并发工具类,并附带有趣的示例
按照需要同时阻塞的线程个数count,初始化对象。
CountDownLatch countDownLatch = new CountDownLatch(count);
每次调用countDownLatch.countDown(), count计数器-1。
countDownLatch.await()阻塞当前线程,直到count值为0恢复运行。
适合用在多个线程协助;或用在多个点或多个线程任务之间协作,比如:
使用CountDownLatch对象,并设置一个初始值,
如 CountDownLatch countDownLatch = new CountDownLatch(5).
在Task F中调用countDownLatch.await()阻塞当前线程,直到count值为0时,自动运行。
Task A….Task E ,这5个任务中,调用countDownLatch.countDown(),表示每完成一个任务就将count值减一.
Task A…E运行完之后,Task F将不再阻塞,立刻运行。
以上描述对应代码实现:
CountDownLatch countDownLatch = new CountDownLatch(count);
2.CountDownLatch中实现的同步器(extends AQS)
CountDownLatch也是通过AQS来实现的,我们先来看下这个AQS的内部实现类:
3.countDownLatch.await() 阻塞。因为由共享锁实现,所以这方法可以被多次调用,也就是说可以重复调用这个方法阻塞多个线程
AbstractQueuedSynchronizer.java 详细分析AQS请点击查看之前文章。
使用AQS完成了锁的获取,如果获取锁失败也就是说countDownLatch计数器不为0,那么获取锁失败,以共享模式进入sync队列中等待(park)。
也就是通常new CountDownLatch(count)初始化时,默认代表了count把锁被占用,直到countDown()多次被调用,count==0,各个阻塞线程才能获取锁,从阻塞状态被唤醒。
await()总结
初始化CountDownLatch countDownLatch = new CountDownLatch(count),设置了state同步状态==count,那么每次一个线程中调用countDownLatch.await(),就会执行tryAcquireShared(),因为state!=0所以返回-1,继续执行AQS父类的doAcquireSharedInterruptibly(),当前结点将被构造为一个共享结点,放入Sync同步队里中进行阻塞。
3.countDownLatch.countDown()
——>
当nextc==0表示锁释放处于空闲状态,进入AQS中doReleaseShared()唤醒后继结点(await的线程构造的结点)。
如果对AQS不清楚,请先详解请查看AQS
countDown()总结
多个线程调用了countDownLatch.await()将当前线程阻塞,直到执行countDownLatch.countDown()使state做减法操作,直到state==0时,表示当前线程获取到了锁。
当前线程将进入aqs.doReleaseShared()方法中,唤醒头结点(也就是当前线程代表的结点)的后继结点,后继结点如果都是共享模式的结点,那么会依次唤醒自己的共享后继结点。
所有的await线程将被唤醒。被唤醒的线程继续获取锁,因为CountDownLatch.tryAcquireShared(..)的实现可以保证多个线程获取一把锁,所以后继结点将都获取锁,接着执行doReleaseShared()中的unparkSuccessor,保证了所有共享结点都被立马唤醒。
然后所有之前await的线程,都将从Sync队列中被摘下,从await()上返回继续执行最上层代码。
CountDownLatch 内部定义了一个AQS的子类,通过使用AQS中的方法
tryReleaseShared\tryAcquireShared\releaseShared\doAcquireSharedInterruptibly方法,构造了一个共享锁的组件。
初始化时,通过传入参数,初始化了aqs state的数值。
await() 某个线程调用await,因为state!=0都将被park阻塞,被构造为共享结点进入Sync队列排队。
countDown() 每次调用,state数值都将-1,直到某个线程-1后state==0。接着进入aqs.releaseShared方法释放锁,传播式唤醒后继结点中的所有共享结点(就是所有await等待的线程结点,将会从await方法上返回)
output:
2.cyclicBarrier.await() 当前线程阻塞
小结
内部组合了ReentrantLock的Condition做实现。
每次调用CyclicBarrier.await(),内部计数器会-1,然后阻塞在condition.await()
直到最后一个线程调用CyclicBarrier.await,计数器达到0:唤醒所有等待的线程(condition.notifyALL),同时将计数器置为初始化状态以便下次继续使用。
CountDownLatch vs CyclicBarrier
内部实现:前者内部基于共享模式实现了AQS的子类。后者直接使用ReentrantLock类、Condition类以及一个计数器实现线程之间的同步。
功能:
CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重
置。所以CyclicBarrier能处理更为复杂的业务场景。例如,如果计算发生错误,可以重置计数
器,并让线程重新执行一次。另外,CyclicBarrier的await方法中会自动-1,而CountDownLatch中需要调用countDown().
一个生产者线程生产消息 msgX,然后调用exchanger.exchang(msgX),此时该线程Blocked,等待另一个线程(消费者)去消费:消费者调用 exchanger.exchange(“此参数会传递给生产者线程,并不需要”),获得生成者刚才生成的消息,并唤醒阻塞的生产者.
控制使用资源的并发数量
aqs共享模式实现的共享锁(还有一个是ReentrantReadAndWriteLock)。
可用于对某一资源的限量访问,超过某阈值阻塞后边的进程进入Sync排队阻塞。
同样是使用AQS实现,同时实现了公平与非公平模式(类似ReentrantLock)。源码与之前都很类似,很简单,理解的关键是理解AQS
使用AQS构造了共享模式的锁,不支持重入。当一个线程调用semaphore.acquire回去一把锁,当获取的次数达到阈值,则后续的获取线程会被阻塞进入Sync队列等待,直到某一个线程释放后进入。
CountDownLatch
概要按照需要同时阻塞的线程个数count,初始化对象。
CountDownLatch countDownLatch = new CountDownLatch(count);
每次调用countDownLatch.countDown(), count计数器-1。
countDownLatch.await()阻塞当前线程,直到count值为0恢复运行。
适合用在多个线程协助;或用在多个点或多个线程任务之间协作,比如:
使用CountDownLatch对象,并设置一个初始值,
如 CountDownLatch countDownLatch = new CountDownLatch(5).
在Task F中调用countDownLatch.await()阻塞当前线程,直到count值为0时,自动运行。
Task A….Task E ,这5个任务中,调用countDownLatch.countDown(),表示每完成一个任务就将count值减一.
Task A…E运行完之后,Task F将不再阻塞,立刻运行。
以上描述对应代码实现:
class PortionTask implements Runnable { private static int counter = 0; private int id = counter++; private CountDownLatch countDownLatch; public PortionTask(CountDownLatch countDownLatch) { this.countDownLatch = countDownLatch; } @Override public void run() { System.out.println("portion task complete " + this + " Thread[" + Thread.currentThread().getName() + "]"); try { TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } countDownLatch.countDown(); } @Override public String toString() { return "PortionTask [id=" + id + "]"; } } class TaskWaiting implements Runnable { private CountDownLatch countDownLatch; public TaskWaiting(CountDownLatch countDownLatch) { this.countDownLatch = countDownLatch; } @Override public void run() { try { countDownLatch.await(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("it's my turn " + this); } } public class CountDownLatchDemoMine { public static void main(String[] args) { int SIZE = 5; CountDownLatch c = new CountDownLatch(SIZE); ExecutorService exec = Executors.newCachedThreadPool(); exec.execute(new PortionTask(c));// A Task exec.execute(new PortionTask(c));// B Task exec.execute(new PortionTask(c));// C Task exec.execute(new PortionTask(c));// D Task exec.execute(new PortionTask(c));// E Task exec.execute(new TaskWaiting(c));// Final Task F exec.shutdown(); } }
CountDownLatch源码分析
1.初始化CountDownLatch countDownLatch = new CountDownLatch(count);
//从锁的角度理解,CountDownLatch初始化完成后,就有count把锁占用 //CountDownLatch public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count);//对(AQS)sync同步器初始化 } private static final class Sync extends AbstractQueuedSynchronizer { Sync(int count) {//入参代表锁state的数值 setState(count);// volatile set } ............
2.CountDownLatch中实现的同步器(extends AQS)
CountDownLatch也是通过AQS来实现的,我们先来看下这个AQS的内部实现类:
//CountDownLatch.Sync /** * Synchronization control For CountDownLatch. * Uses AQS state to represent count. */ private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) {//入参代表锁state的数值 setState(count); } int getCount() { return getState(); } //使用了共享模式锁,当state==0时表示锁空闲. // -1获取锁失败 1 获取成功 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } //共享模式释放锁,尝试获取获取锁 //false:释放共享锁失败 //true:调用AQS方法唤醒sync的后继等待结点 protected boolean tryReleaseShared(int releases) { // count递减,直到为0后唤醒阻塞的线程 for (;;) { int c = getState(); if (c == 0) return false;//锁已经被释放 int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0;//表是锁已经被完全释放 } } }
3.countDownLatch.await() 阻塞。因为由共享锁实现,所以这方法可以被多次调用,也就是说可以重复调用这个方法阻塞多个线程
//CountDownLatch public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1);//共享式 可中断获取锁 }
AbstractQueuedSynchronizer.java 详细分析AQS请点击查看之前文章。
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted())//响应中断 throw new InterruptedException(); if (tryAcquireShared(arg) < 0)//上文已述。state!=0的时候都会返回true //doAcquireSharedInterruptibly主要逻辑(详细请参考AQS文章): //1.构造当前线程为共享结点; //2.加入到sync队列尾部; //3.再次尝试获取共享锁,成功则传播式唤醒所有后继共享结点。 //4.获取锁失败park阻塞,直到被前驱唤醒。 doAcquireSharedInterruptibly(arg); }
//CountDownLatch.Sync //使用了共享模式锁,当state==0时表示锁空闲。那么通常new CountDownLatch(count)初始化后state==count,返回-1。 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
使用AQS完成了锁的获取,如果获取锁失败也就是说countDownLatch计数器不为0,那么获取锁失败,以共享模式进入sync队列中等待(park)。
也就是通常new CountDownLatch(count)初始化时,默认代表了count把锁被占用,直到countDown()多次被调用,count==0,各个阻塞线程才能获取锁,从阻塞状态被唤醒。
await()总结
初始化CountDownLatch countDownLatch = new CountDownLatch(count),设置了state同步状态==count,那么每次一个线程中调用countDownLatch.await(),就会执行tryAcquireShared(),因为state!=0所以返回-1,继续执行AQS父类的doAcquireSharedInterruptibly(),当前结点将被构造为一个共享结点,放入Sync同步队里中进行阻塞。
3.countDownLatch.countDown()
/** * Decrements the count of the latch, releasing all waiting threads if * the count reaches zero. * * <p>If the current count is greater than zero then it is decremented. * If the new count is zero then all waiting threads are re-enabled for * thread scheduling purposes. * * <p>If the current count equals zero then nothing happens. * 每次一个线程执行countDown(),state都会-1. */ public void countDown() { sync.releaseShared(1); }
——>
//AbstractQueuedSynchronizer.releaseShared: /** * Releases in shared mode. Implemented by unblocking one or more * threads if {@link #tryReleaseShared} returns true. * * @param arg the release argument. This value is conveyed to * {@link #tryReleaseShared} but is otherwise uninterpreted * and can represent anything you like. * @return the value returned from {@link #tryReleaseShared} */ public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared();//state--,当state==0时执行这个方法,执行AQS的doReleaseShared(),所有Sync队列中的共享结点将被依次唤醒。 return true; } return false; }
//CountDownLatch.Sync.tryReleaseShared(1) //共享模式释放锁,尝试获取获取锁 protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false;//锁已经被释放 int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0;//true:state到达0.调用AQS方法唤醒sync的后继等待结点 } } }
当nextc==0表示锁释放处于空闲状态,进入AQS中doReleaseShared()唤醒后继结点(await的线程构造的结点)。
如果对AQS不清楚,请先详解请查看AQS
countDown()总结
多个线程调用了countDownLatch.await()将当前线程阻塞,直到执行countDownLatch.countDown()使state做减法操作,直到state==0时,表示当前线程获取到了锁。
当前线程将进入aqs.doReleaseShared()方法中,唤醒头结点(也就是当前线程代表的结点)的后继结点,后继结点如果都是共享模式的结点,那么会依次唤醒自己的共享后继结点。
所有的await线程将被唤醒。被唤醒的线程继续获取锁,因为CountDownLatch.tryAcquireShared(..)的实现可以保证多个线程获取一把锁,所以后继结点将都获取锁,接着执行doReleaseShared()中的unparkSuccessor,保证了所有共享结点都被立马唤醒。
然后所有之前await的线程,都将从Sync队列中被摘下,从await()上返回继续执行最上层代码。
CountDownLatch 整体总结
概述:通过内部state计数的方式,协调多个线程的阻塞,并使它们在某一时刻同时唤醒。CountDownLatch 内部定义了一个AQS的子类,通过使用AQS中的方法
tryReleaseShared\tryAcquireShared\releaseShared\doAcquireSharedInterruptibly方法,构造了一个共享锁的组件。
初始化时,通过传入参数,初始化了aqs state的数值。
await() 某个线程调用await,因为state!=0都将被park阻塞,被构造为共享结点进入Sync队列排队。
countDown() 每次调用,state数值都将-1,直到某个线程-1后state==0。接着进入aqs.releaseShared方法释放锁,传播式唤醒后继结点中的所有共享结点(就是所有await等待的线程结点,将会从await方法上返回)
CyclicBarrier
直接上代码,一个很多有趣的短跑比赛模拟程序,详见注释,看完你就理解了CyclicBarrier:class Player implements Runnable { private static int counter = 0; private int id = counter++; private int moveDistance; private static CyclicBarrier barrier; private static Random random = new Random(47); public Player(CyclicBarrier b) { barrier = b; } public int getMoveDistance() { return moveDistance; } @Override public void run() { while (!Thread.interrupted()) { moveDistance += random.nextInt(3); try { barrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } } @Override public String toString() { return "" + id; } } public class Race { private int playerNum = 5; private int finalLine = 20; private CyclicBarrier barrier;// 唯一的CyclicBarrier对象 private List<Player> players = new ArrayList<Player>(); private ExecutorService es = Executors.newCachedThreadPool(); public Race() { // 1 构建CyclicBarrier(int parties, Runnable barrierAction) // 当n个player都在这个barrier上调用了barrier.await(),这个barrier // 对象将重置,并使用最后一个调用await的线程去运行barrierAction, // 唤醒所有wait的线程, // 然后再一次等待nHourse全部执行await....周而复始;直到shutdownNow // 查看CyclicBarrier构造器的JDoc,有详细描述 barrier = new CyclicBarrier(playerNum, new Runnable() { @Override public void run() { // 1.打印赛道 StringBuffer raceLine = new StringBuffer(); raceLine.append(" "); for (int i = 0; i < finalLine; i++) { raceLine.append("="); } System.out.println(" 100米男子赛道"); System.out.println(raceLine); // 2.打印当前参赛者行进位置 for (Player player : players) { StringBuffer playerLocation = new StringBuffer(); playerLocation.append(player); for (int i = 0; i < player.getMoveDistance(); i++) { playerLocation.append("*"); } System.out.println(playerLocation); } // 3.检查是否有参赛者完成比赛,如果是,结束比赛; // (会存在多个player同时越过终点) for (Player player : players) { if (player.getMoveDistance() >= finalLine) { System.out.println("bravo: player" + player + " won!!! "); es.shutdownNow();// 该方法并不保证池中所有线程退出 return; } } try { TimeUnit.MILLISECONDS.sleep(80); } catch (InterruptedException e) { e.printStackTrace(); } } }); // 2.任务执行:所有player开始跑 for (int i = 0; i < playerNum; i++) { // 每个Player中传入同一个CyclicBarrier对象 Player p = new Player(barrier); players.add(p); es.execute(p); } } public static void main(String[] args) { new Race(); } }
output:
100米男子赛道 ========== 0** 1** 2* 3** 4* .............................. 100米男子赛道 ========== 0******* 1********* 2***** 3****** 4****** 100米男子赛道 ========== 0******** 1********** 2***** 3******* 4****** bravo: player1 won!!!
CyclicBarrier源码分析
1.构造 内部组合了ReentrantLock以及Condition//CyclicBarrier /** The lock for guarding barrier entry */ private final ReentrantLock lock = new ReentrantLock(); /** Condition to wait on until tripped */ private final Condition trip = lock.newCondition(); /** 需要await的次数*/ private final int parties; /* 当逃脱发生时(也就是parties次调用了这个await()),执行自定义的任务 */ private final Runnable barrierCommand; /** The current generation */ private Generation generation = new Generation(); /** * 参与者(等待者)的数量。 * 当count达到0,则count会被重置为初始值parties * *因为使用Condition的前提是已经获取到了RenntrantLock的锁,所以这个变量不需要使用volatile修饰 */ private int count; public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } public CyclicBarrier(int parties) { this(parties, null); }
2.cyclicBarrier.await() 当前线程阻塞
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } } private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock();//获取ReentrantLock锁 try { final Generation g = generation; if (g.broken)//默认false throw new BrokenBarrierException(); if (Thread.interrupted()) {//对中断响应 breakBarrier();//当前线程中断;signalAll同时唤醒其他等待者 throw new InterruptedException(); } int index = --count;//在await()中执行count--.(与CountDownLatch) if (index == 0) { // tripped 跳脱。parites次调用了await。那么index==0 boolean ranAction = false; try { /* 当逃脱发生时(也就是parties次调用了这个await()),可以直接执行自定义的任务 */ final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; nextGeneration();//主要逻辑:signalAll();重置count return 0; } finally { if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out for (;;) { try { if (!timed) trip.await();//使用Condition.await 阻塞 else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken)//响应中断抛出异常 throw new BrokenBarrierException(); if (g != generation)//count已经重置了,新的一轮已经开始 return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } } /** * Sets current barrier generation as broken and wakes up everyone. * Called only while holding lock. */ private void breakBarrier() { generation.broken = true; count = parties; trip.signalAll(); } private void nextGeneration() { // 唤醒所有在这个condition上park等待的线程.也就是将这些线程对应的结点放入sync队列中。 trip.signalAll(); // set up next generation count = parties;//重置count generation = new Generation(); }
小结
内部组合了ReentrantLock的Condition做实现。
每次调用CyclicBarrier.await(),内部计数器会-1,然后阻塞在condition.await()
直到最后一个线程调用CyclicBarrier.await,计数器达到0:唤醒所有等待的线程(condition.notifyALL),同时将计数器置为初始化状态以便下次继续使用。
CountDownLatch vs CyclicBarrier
内部实现:前者内部基于共享模式实现了AQS的子类。后者直接使用ReentrantLock类、Condition类以及一个计数器实现线程之间的同步。
功能:
CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重
置。所以CyclicBarrier能处理更为复杂的业务场景。例如,如果计算发生错误,可以重置计数
器,并让线程重新执行一次。另外,CyclicBarrier的await方法中会自动-1,而CountDownLatch中需要调用countDown().
Exchanger
示例描述:一个生产者线程生产消息 msgX,然后调用exchanger.exchang(msgX),此时该线程Blocked,等待另一个线程(消费者)去消费:消费者调用 exchanger.exchange(“此参数会传递给生产者线程,并不需要”),获得生成者刚才生成的消息,并唤醒阻塞的生产者.
public class ExchangerTest { private static final Exchanger<List<String>> exgr = new Exchanger<List<String>>(); private static ExecutorService threadPool = Executors.newFixedThreadPool(10); private static Random rand = new Random(47); private static Runnable producerProductMsg() { return new Runnable() { @Override public void run() { List<String> producerMsgX = new ArrayList<String>(); for (int i = 0; i < 10; i++) { producerMsgX.add(i + rand.nextInt(100) + ""); } try { // 生产,阻塞 // Waits for another thread to arrive at this exchange // point exgr.exchange(producerMsgX); System.out.println(" ThreadName:" + Thread.currentThread().getName() + " producerProductMsg:" + producerMsgX); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }; } private static Runnable consumerConsumeMsg() { return new Runnable() { @Override public void run() { try { List<String> getMsgFromProducer = exgr.exchange(new ArrayList<String>()); System.out.println(" ThreadName:" + Thread.currentThread().getName() + " consumerConsumeMsg:" + getMsgFromProducer); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }; } public static void main(String[] args) { // 消息生成者 threadPool.execute(producerProductMsg()); // 消费者 消费 threadPool.execute(consumerConsumeMsg()); threadPool.shutdown(); } }
Semaphore
要点控制使用资源的并发数量
对象池实现示例
class Fat { private volatile double d; private static int counter = 0; private int id = counter++; public Fat() { for (int i = 0; i < 10000; i++) { d += Math.PI + Math.E; } } public void operation() { System.out.println("opreation:" + this); } @Override public String toString() { return "Fat [id=" + id + "]"; } } public class Pool<T> { private int size; private List<T> items = new ArrayList<T>(); private volatile boolean[] checkOut; private Semaphore available; public Pool(Class<T> classObj, int size) { this.size = size; checkOut = new boolean[size]; available = new Semaphore(size, true); // 初始化对象池 for (int i = 0; i < size; i++) { try { items.add(classObj.newInstance()); } catch (Exception e) { throw new RuntimeException(e); } } } private synchronized T getItem() { for (int i = 0; i < size; i++) { if (!checkOut[i]) {// 未被取出,那么取出使用 checkOut[i] = true; return items.get(i); } } return null;// Semaphore 阻止了对象池被使用空的情况 } private synchronized boolean releaseItem(T x) { int index = items.indexOf(x); if (-1 == index) { return false;// 该对象x并不在池中 } if (checkOut[index]) {// 该对象不在池中 checkOut[index] = false; return true; } return false;// 对象未被从池中取出 } public T checkOut() throws InterruptedException { available.acquire(); return getItem(); } public void checkIn(T x) { if (releaseItem(x)) { available.release(); } } }
class CheckoutTast<T> implements Runnable { private static int counter = 0; private int id = counter++; private Pool<T> objPool; public CheckoutTast(Pool<T> objPool) { this.objPool = objPool; } @Override public void run() { try { T item = objPool.checkOut(); System.out.println(this + " checked out " + item); TimeUnit.SECONDS.sleep(1); System.out.println(this + " checked in " + item); objPool.checkIn(item); } catch (Exception e) { e.printStackTrace(); } } @Override public String toString() { return "CheckoutTast [id=" + id + "]"; } } public class SemaphoreDemo { final static int SIZE = 5; public static void main(String[] args) throws InterruptedException { final Pool<Fat> pool = new Pool<Fat>(Fat.class, SIZE); ExecutorService exec = Executors.newCachedThreadPool(); for (int i = 0; i < SIZE; i++) { exec.execute(new CheckoutTast<Fat>(pool)); } System.out.println("All checkout Tasks were created,size:" + SIZE); List<Fat> list = new ArrayList<Fat>(); // 取出池中所有的对象 for (int i = 0; i < SIZE; i++) { Fat f = pool.checkOut(); System.out.println(">>>>" + i + "[main Thread checked out]:" + f); f.operation(); list.add(f); } // 池中没有客用对象,由于使用了Semaphore,阻塞 Future<?> blocked = exec.submit(new Runnable() { @Override public void run() { // Semaphore 会阻塞超额的CheckOut try { pool.checkOut(); } catch (InterruptedException e) { System.out.println("blocked interrupted"); } } }); TimeUnit.SECONDS.sleep(5); blocked.cancel(true); System.out.println(); // 返回池中所有对象 for (Fat f : list) { pool.checkIn(f); } for (Fat f : list) {// 此时checkIn总是返回False,冗余checkIn被忽略 pool.checkIn(f); } exec.shutdown(); } }
Semaphore源码分析
通常用来控制并发访问资源的线程数量,比如连接池。aqs共享模式实现的共享锁(还有一个是ReentrantReadAndWriteLock)。
可用于对某一资源的限量访问,超过某阈值阻塞后边的进程进入Sync排队阻塞。
同样是使用AQS实现,同时实现了公平与非公平模式(类似ReentrantLock)。源码与之前都很类似,很简单,理解的关键是理解AQS
public Semaphore(int permits) { sync = new NonfairSync(permits);//初始化。默认使用非公平锁 } public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); } //AQS实现 AQS——>Sync——>NonfairSync\FairSync abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1192457210091910933L; Sync(int permits) { setState(permits);// 初始化state } final int getPermits() { return getState(); } // 非公平模式 获取共享锁 final int nonfairTryAcquireShared(int acquires) { for (;;) {// 自旋 int available = getState(); int remaining = available - acquires;// (允许进入的线程)剩余个数 if (remaining < 0 || compareAndSetState(available, remaining))// 没有达到阈值并且CAS获取锁状态成功 return remaining;// 返回剩余个数 // 如果位负数表示已经达到阈值,会阻塞当前线程进入AQSsync队列排队 } } protected final boolean tryReleaseShared(int releases) { for (;;) {// 自旋 int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true;// 唤醒后继等待结点 } } final void reducePermits(int reductions) { for (;;) {// 自旋 int current = getState(); int next = current - reductions; if (next > current) // underflow throw new Error("Permit count underflow"); if (compareAndSetState(current, next)) return; } } final int drainPermits() { for (;;) {// 自旋 int current = getState(); if (current == 0 || compareAndSetState(current, 0)) return current; } } } static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L; NonfairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires);// 通过自旋方式获取锁,如果达到阈值,那么waiting } } /** * Fair version */ static final class FairSync extends Sync { private static final long serialVersionUID = 2014338818796000944L; FairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { for (;;) {// 通过自旋方式获取锁,无waiting if (hasQueuedPredecessors())// 保证公平性 return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } }
使用AQS构造了共享模式的锁,不支持重入。当一个线程调用semaphore.acquire回去一把锁,当获取的次数达到阈值,则后续的获取线程会被阻塞进入Sync队列等待,直到某一个线程释放后进入。
相关文章推荐
- java volatile 详解
- Java学习经验
- Java实现快速排序
- # 20145210 《Java程序设计》第04周学习总结
- 学习Java JDBC,看这篇就够了
- [leetcode]Remove Duplicates from Sorted Array II
- JAVA多线程开发——线程的创建与运行
- Java静态内部类,普通内部类的理解
- Java——UDP
- java事务学习笔记(四)--成功的案例(自己实现一个线程安全的TransactionManager)
- Java格式化快捷键Eclipse
- 深入线程:生产者与消费者问题
- java中多态的经典问题分析
- myeclipse安装svn插件并使用(本地link链接式安装插件-非侵入式安装)
- JAVA GC机制
- Spring-AOP
- 详解spring-MVC DispatcherServlet初始化
- java web图片显示到浏览器
- Myeclipse怎么导入工程项目
- java按照指定编码方式读取文件