您的位置:首页 > 编程语言 > Java开发

Java并发Concurrent包的锁(七)——Semaphore源码分析及使用

2018-03-08 16:38 549 查看
Semaphore 通常用于限制可以访问某些资源(物理或逻辑的)的线程数目。信号量维护了一个许可集。如有必要,在许可可用前会阻塞每一个 acquire(),然后再获取该许可。每个 release() 添加一个许可,从而可能释放一个正在阻塞的获取者。

Sync类

abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
// 许可总数量,使用AQS的state来保存
Sync(int permits) {
setState(permits);
}
// 获取当前剩余许可数量
final int getPermits() {
return getState();
}
// 非公平的获取指定数量的许可,返回剩余的数量
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
// 这里剩余的如果小于0,会造成短路,并没有CAS更新的操作
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

// 释放指定个数的许可,将其返回给信号量。
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
// 削减指定个数的许可数量
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}
// 获取并返回立即可用的所有许可
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}


NonfairSync

非公平版本的 Sync 。

static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;

NonfairSync(int permits) {
super(permits);
}

protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}


FairSync

非公平的 Sync 当然要判断队列头节点正在等待的是不是当前线程,如果不是的话当前线程要排队等待。

static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;

FairSync(int permits) {
super(permits);
}

protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}


构造函数

// 默认的构造函数是非公平的
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}

// 给定访问许可总数来构造公平或非公平的信号量
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}


重要方法

acquire()

从此信号量获取一个许可,在提供一个许可前一直将线程阻塞,否则线程被中断。

public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}


acquireUninterruptibly()

从此信号量中获取许可,在有可用的许可前将其阻塞。

public void acquireUninterruptibly() {
sync.acquireShared(1);
}


tryAcquire()

仅在调用时此信号量存在一个可用许可,才从信号量获取许可。

nonfairTryAcquireShared 方法如上边展示的,剩余为负数时,并不更新 state 的值。说明此次获取是失败的。

public boolean tryAcquire() {
// 调用Sync中的方法,如果获取1个资源后剩余的大于等于0,返回true,如果剩余的为负数,返回false。
return sync.nonfairTryAcquireShared(1) >= 0;
}


release()

释放一个许可,将其返回给信号量。

public void release() {
sync.releaseShared(1);
}


acquire(int permits)

从此信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞,或者线程已被中断。

public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}


acquireUninterruptibly(int permits)

从此信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞。

public void acquireUninterruptibly(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireShared(permits);
}


tryAcquire(int permits)

仅在调用时此信号量中有给定数目的许可时,才从此信号量中获取这些许可。

public boolean tryAcquire(int permits) {
if (permits < 0) throw new IllegalArgumentException();
return sync.nonfairTryAcquireShared(permits) >= 0;
}


release(int permits)

释放给定数目的许可,将其返回到信号量。

public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}


availablePermits()

返回此信号量中当前可用的许可数。

public int availablePermits() {
return sync.getPermits();
}


drainPermits()

获取并返回立即可用的所有许可。

public int drainPermits() {
return sync.drainPermits();
}


reducePermits(int reduction)

根据指定的缩减量减小可用许可的数目。

protected void reducePermits(int reduction) {
if (reduction < 0) throw new IllegalArgumentException();
sync.reducePermits(reduction);
}


使用情景

我们以银行办业务的例子来说明,假设银行有 4 个柜台,一次进来了 10 个办理业务的人:

public class TestSemaphore {

private static Semaphore semaphore;

// 银行柜台
public static class BankCounter {

// 办业务
public void banking() throws InterruptedException{
semaphore.acquire();
System.out.println("用户" + Thread.currentThread().getName() + "开始处理!");
// 处理业务的时间
int waitTime = (int)(Math.random()*10 +1);
Thread.sleep(waitTime*1000);
System.out.println("用户" + Thread.currentThread().getName() + "处理结束,共用时 "+ waitTime +" 秒,离开窗口,下一个!");
semaphore.release();
}

}

// 等待窗口办业务的线程
public static class WindowThread extends Thread{

@Override
public void run() {
try {
new BankCounter().banking();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}

public static void main(String[] args) throws InterruptedException {

// 信号量设置为4,等于只有4个窗口资源,设置为公平模式排队
semaphore = new Semaphore(4, true);

// 启动10个线程,10个用户在等待办业务,500ms延迟是为了线程编号从小到大公平的排队
for(int i=0; i<10; i++){
new WindowThread().start();
Thread.sleep(500);
}

}

}


运行结果:

用户Thread-0开始处理!
用户Thread-1开始处理!
用户Thread-2开始处理!
用户Thread-3开始处理!
用户Thread-1处理结束,共用时 2 秒,离开窗口,下一个!
用户Thread-4开始处理!
用户Thread-0处理结束,共用时 4 秒,离开窗口,下一个!
用户Thread-5开始处理!
用户Thread-2处理结束,共用时 4 秒,离开窗口,下一个!
用户Thread-6开始处理!
用户Thread-3处理结束,共用时 7 秒,离开窗口,下一个!
用户Thread-7开始处理!
用户Thread-4处理结束,共用时 7 秒,离开窗口,下一个!
用户Thread-8开始处理!
用户Thread-6处理结束,共用时 6 秒,离开窗口,下一个!
用户Thread-9开始处理!
用户Thread-9处理结束,共用时 1 秒,离开窗口,下一个!
用户Thread-5处理结束,共用时 9 秒,离开窗口,下一个!
用户Thread-7处理结束,共用时 8 秒,离开窗口,下一个!
用户Thread-8处理结束,共用时 7 秒,离开窗口,下一个!
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐