Java并发Concurrent包的锁(七)——Semaphore源码分析及使用
2018-03-08 16:38
549 查看
Semaphore 通常用于限制可以访问某些资源(物理或逻辑的)的线程数目。信号量维护了一个许可集。如有必要,在许可可用前会阻塞每一个 acquire(),然后再获取该许可。每个 release() 添加一个许可,从而可能释放一个正在阻塞的获取者。
nonfairTryAcquireShared 方法如上边展示的,剩余为负数时,并不更新 state 的值。说明此次获取是失败的。
运行结果:
Sync类
abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1192457210091910933L; // 许可总数量,使用AQS的state来保存 Sync(int permits) { setState(permits); } // 获取当前剩余许可数量 final int getPermits() { return getState(); } // 非公平的获取指定数量的许可,返回剩余的数量 final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; // 这里剩余的如果小于0,会造成短路,并没有CAS更新的操作 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } // 释放指定个数的许可,将其返回给信号量。 protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } } // 削减指定个数的许可数量 final void reducePermits(int reductions) { for (;;) { int current = getState(); int next = current - reductions; if (next > current) // underflow throw new Error("Permit count underflow"); if (compareAndSetState(current, next)) return; } } // 获取并返回立即可用的所有许可 final int drainPermits() { for (;;) { int current = getState(); if (current == 0 || compareAndSetState(current, 0)) return current; } } }
NonfairSync
非公平版本的 Sync 。static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L; NonfairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } }
FairSync
非公平的 Sync 当然要判断队列头节点正在等待的是不是当前线程,如果不是的话当前线程要排队等待。static final class FairSync extends Sync { private static final long serialVersionUID = 2014338818796000944L; FairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } }
构造函数
// 默认的构造函数是非公平的 public Semaphore(int permits) { sync = new NonfairSync(permits); } // 给定访问许可总数来构造公平或非公平的信号量 public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
重要方法
acquire()
从此信号量获取一个许可,在提供一个许可前一直将线程阻塞,否则线程被中断。public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
acquireUninterruptibly()
从此信号量中获取许可,在有可用的许可前将其阻塞。public void acquireUninterruptibly() { sync.acquireShared(1); }
tryAcquire()
仅在调用时此信号量存在一个可用许可,才从信号量获取许可。nonfairTryAcquireShared 方法如上边展示的,剩余为负数时,并不更新 state 的值。说明此次获取是失败的。
public boolean tryAcquire() { // 调用Sync中的方法,如果获取1个资源后剩余的大于等于0,返回true,如果剩余的为负数,返回false。 return sync.nonfairTryAcquireShared(1) >= 0; }
release()
释放一个许可,将其返回给信号量。public void release() { sync.releaseShared(1); }
acquire(int permits)
从此信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞,或者线程已被中断。public void acquire(int permits) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); sync.acquireSharedInterruptibly(permits); }
acquireUninterruptibly(int permits)
从此信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞。public void acquireUninterruptibly(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.acquireShared(permits); }
tryAcquire(int permits)
仅在调用时此信号量中有给定数目的许可时,才从此信号量中获取这些许可。public boolean tryAcquire(int permits) { if (permits < 0) throw new IllegalArgumentException(); return sync.nonfairTryAcquireShared(permits) >= 0; }
release(int permits)
释放给定数目的许可,将其返回到信号量。public void release(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.releaseShared(permits); }
availablePermits()
返回此信号量中当前可用的许可数。public int availablePermits() { return sync.getPermits(); }
drainPermits()
获取并返回立即可用的所有许可。public int drainPermits() { return sync.drainPermits(); }
reducePermits(int reduction)
根据指定的缩减量减小可用许可的数目。protected void reducePermits(int reduction) { if (reduction < 0) throw new IllegalArgumentException(); sync.reducePermits(reduction); }
使用情景
我们以银行办业务的例子来说明,假设银行有 4 个柜台,一次进来了 10 个办理业务的人:public class TestSemaphore { private static Semaphore semaphore; // 银行柜台 public static class BankCounter { // 办业务 public void banking() throws InterruptedException{ semaphore.acquire(); System.out.println("用户" + Thread.currentThread().getName() + "开始处理!"); // 处理业务的时间 int waitTime = (int)(Math.random()*10 +1); Thread.sleep(waitTime*1000); System.out.println("用户" + Thread.currentThread().getName() + "处理结束,共用时 "+ waitTime +" 秒,离开窗口,下一个!"); semaphore.release(); } } // 等待窗口办业务的线程 public static class WindowThread extends Thread{ @Override public void run() { try { new BankCounter().banking(); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { // 信号量设置为4,等于只有4个窗口资源,设置为公平模式排队 semaphore = new Semaphore(4, true); // 启动10个线程,10个用户在等待办业务,500ms延迟是为了线程编号从小到大公平的排队 for(int i=0; i<10; i++){ new WindowThread().start(); Thread.sleep(500); } } }
运行结果:
用户Thread-0开始处理! 用户Thread-1开始处理! 用户Thread-2开始处理! 用户Thread-3开始处理! 用户Thread-1处理结束,共用时 2 秒,离开窗口,下一个! 用户Thread-4开始处理! 用户Thread-0处理结束,共用时 4 秒,离开窗口,下一个! 用户Thread-5开始处理! 用户Thread-2处理结束,共用时 4 秒,离开窗口,下一个! 用户Thread-6开始处理! 用户Thread-3处理结束,共用时 7 秒,离开窗口,下一个! 用户Thread-7开始处理! 用户Thread-4处理结束,共用时 7 秒,离开窗口,下一个! 用户Thread-8开始处理! 用户Thread-6处理结束,共用时 6 秒,离开窗口,下一个! 用户Thread-9开始处理! 用户Thread-9处理结束,共用时 1 秒,离开窗口,下一个! 用户Thread-5处理结束,共用时 9 秒,离开窗口,下一个! 用户Thread-7处理结束,共用时 8 秒,离开窗口,下一个! 用户Thread-8处理结束,共用时 7 秒,离开窗口,下一个!
相关文章推荐
- Java并发Concurrent包的锁(六)——CountDownLatch源码分析及使用
- Java并发Concurrent包的锁(五)——CyclicBarrier源码分析及使用
- Java并发Concurrent包的锁(四)——读写锁源码分析
- Java并发系列之Semaphore源码分析
- Java并发编程 -- Concurrent包源码分析4 -- 各种锁与无锁
- Java并发Concurrent包的锁(三)——ReentrantLock源码分析
- Java 并发 --- Semaphore源码分析
- Java并发编程 -- Concurrent包源码分析4 -- 各种锁与无锁
- Java并发之Semaphore的源码分析
- Java并发包源码学习之AQS框架(四)AbstractQueuedSynchronizer源码分析
- Java Concurrent包源码学习和使用心得 之 LinkedBlockingQueue源码解读
- java学习之旅57、58--数组_StringBuilder和StringBuffer的使用_JDK源码分析内部机制、常用方法补充_常见面试题答法
- java并发:Semaphore 的使用
- 聊聊并发(三)Java线程池的分析和使用
- Java并发包源码学习之线程池(一)ThreadPoolExecutor源码分析
- 聊聊高并发(四十)解析java.util.concurrent各个组件(十六) ThreadPoolExecutor源码分析
- Android中AsyncTask的使用与源码分析+3.0以前的缺陷(并发->逐一)
- 聊聊高并发(十二)分析java.util.concurrent.atomic.AtomicStampedReference源码来看如何解决CAS的ABA问题
- Java concurrent Framework并发容器之concurrent.atomic包源码分析
- java非并发容器ArrayList 和 LinkedList 优缺点比较及其实现源码分析