您的位置:首页 > 其它

高并发学习笔记(八)

2019-04-09 21:47 78 查看

四、同步器的实现(四)

4.Semaphore

    Semaphore是一个信号计数量,用于维护一个资源的使用许可量,如果维护的资源使用许可超出会阻塞每一个请求该资源的许可(acquire),直到又有可用的。Semaphore通常用于限制访问资源的线程数目,下面是一个使用示例:

/**
* Semaphore使用示例,Semaphore代表厕所坑位
* Created by bzhang on 2019/3/21.
*/
public class TestSemaphore {
private Semaphore lock = new Semaphore(3);      //厕所坑位

public void goToToilet(){
try {
lock.acquire();   //尝试获取一个位置
System.out.println(Thread.currentThread().getName()+"正在使用厕所");
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName()+"上完厕所,很开心");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.release();   //释放一个位置
}
}

public static void main(String[] args) {
TestSemaphore test = new TestSemaphore();
ExecutorService pool = Executors.newCachedThreadPool();     //缓存线程池
for (int i = 0;i < 11;i++){
pool.submit(new Runnable() {
@Override
public void run() {
test.goToToilet();
}
});
}
pool.shutdown();    //关闭线程池
}
}

    Semaphore的使用与ReentrantLock和synchronized的使用很相似,只是锁的使用一般都是独占的,即一次只允许一个线程运行。而Semaphore则是许可多个线程进行访问,当Semaphore只许可一个线程访问时,也就退化成锁。

    Semaphore的底层也是由AQS同步器实现的:

public class Semaphore implements java.io.Serializable {
//AQS同步的队列引用
private final Sync sync;

//由构造方法可以看出Semaphore也分为公平模式和非公平模式
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}

//根据fair创建公平同步队列或是非公平同步队列
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
}

 

    Semaphore的继承关系如下图,是由三个内部类Sync,NonfairSync与FairSync共同来实现Semaphore的信号计数功能。

    下面先来看公平与非公平模式的实现源码:

//非公平模式的实现
static final class NonfairSync extends Sync {
NonfairSync(int permits) {
super(permits);
}

//重写的实AQS中的tryAcquireShared方法,说明使用的是共享模式
//尝试获取锁
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);    //调用父类的非公平模式下尝试获取锁方法
}
}

//公平模式的实现
static final class FairSync extends Sync {
FairSync(int permits) {
super(permits);
}

//尝试获取锁
protected int tryAcquireShared(int acquires) {
for (;;) {    //自旋
//同步队列中是否存在排在当前线程前面的线程结点
//也就是说当前线程是不是等待最久的线程,不是的话,直接不让获取锁
if (hasQueuedPredecessors())
return -1;
int available = getState();    //获取同步状态
int remaining = available - acquires;    //将要更新的同步状态值
//remaining小于0,说明没有可用的资源了,返回获取失败
//remaining大于等于0且CAS方式更新状态失败,则表示有可用资源,继续循环尝试更新直到成功
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}

//公平锁与非公平锁父类
abstract static class Sync extends AbstractQueuedSynchronizer {
Sync(int permits) {
setState(permits);
}

//获取剩余许可资源数
final int getPermits() {
return getState();
}

//非公平尝试获取锁,与公平模式基本相似,只是不需要判断当前线程是不是等待最久的线程
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

//释放锁资源
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current)     //超出最大许可值,抛异常
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}

//减少许可数,即假设当前许可数是5,调用该方法会时许可数变为5-reductions
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}

//获取可立即使用的许可数
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}

    了解了底层实现,再看看Semaphore获取许可和释放许可的过程:

//获取一个许可,在提供一个许可前一直将线程阻塞,或线程被中断
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

//AQS中的方法,可被中断的获取锁的方法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();

//tryAcquireShared调用的是公平锁FairSync或NonfairSync中的重写方法
//tryAcquireShared可能失败,进入doAcquireSharedInterruptibly方法进行自旋或加入同步队列
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}

//尝试获取一个许可,在有可用的许可前将其阻塞,且不可被中断
public void acquireUninterruptibly() {
sync.acquireShared(1);
}

//AQS中的方法
public final void acquireShared(int arg) {
//尝试获取许可失败的话,会进行自旋或加入到同步队列中等待获取锁
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);    //AQS中已分析过,不再深入
}

//尝试获取一个许可
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;    //只进行一次方式,失败后直接返回+
}

//在一定时间内尝试获取许可
public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

//AQS中的在一定时间内尝试获取锁的方法
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//本质上还是先调用tryAcquireShared方法,不成功再在一定时间去自旋或加入同步队列等待获取许可
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}

//获取多个许可,在提供permits许可前一直将线程阻塞,或线程被中断
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}

//获取给定数目的许可,在提供这些许可前一直将线程阻塞,不可被中断
public void acquireUninterruptibly(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireShared(permits);
}

//尝试获取给定数目的许可,值尝试一次,失败直接返回
public boolean tryAcquire(int permits) {
if (permits < 0) throw new IllegalArgumentException();
return sync.nonfairTryAcquireShared(permits) >= 0;
}

//在给定的时间内尝试获取给定数目的许可数,超时返回失败
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}

//释放许可的过程

//释放一个许可
public void release() {
sync.releaseShared(1);
}

//AQS中的方法
public final boolean releaseShared(int arg) {
//尝试释放一个许可,不成功则进入doReleaseShared继续尝试释放许可
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

//释放给定数目的许可
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}

 

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: