您的位置:首页 > 编程语言 > Java开发

java 并发工具包 -信号量 Semaphore

2017-12-09 00:21 585 查看

简介:

Semaphore,信号量,用于控制同时访问的线程数据,如果达到阀值,那么只能等到某线程释放才能被其他线程继续访问。

源码解析:

public class Semaphore implements java.io.Serializable {
private static final long serialVersionUID = -3222578661600680210L;
//和CountDownLatch 一样,定义内部类Sync继承AbstractQueuedSynchronizer ,相对而言,对比CountDownLatch 重写父类方法多一些
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
//初始化计数器
Sync(int permits) {
setState(permits);
}
//获取计数器
final int getPermits() {
return getState();
}
//非公平的获取 计数器-acquires 即消耗了acquires个位置
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
//如果remining<0了的话 就不执行compareAndSetState了 并且返回remining
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
//计数器+releases 即增加了releases 个位置 并释放了头部线程
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
//releases不能为负数
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
//计数器-reductions 即消耗了reductions个位置
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
//reductions不能为负数
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}
//返回当前计数器值 并设置为0  即全部占有
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}

/**
*非公平模式
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;

NonfairSync(int permits) {
super(permits);
}

protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}

/**
* 公平模式
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;

FairSync(int permits) {
super(permits);
}
//和非公平模式不一样的地方
protected int tryAcquireShared(int acquires) {
for (;;) {
// 如果当前线程不在头部节点并且头结点和尾节点不相等 返回-1
if (hasQueuedPredecessors())
return -1;
//同公平模式
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}

//初始化默认使用的非公平模式
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}

//指定模式
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

public void acquire() throws InterruptedException {
//可打断  无位置则阻塞
sync.acquireSharedInterruptibly(1);
}

//不可打断 无位置则阻塞
public void acquireUninterruptibly() {
sync.acquireShared(1);
}

//尝试获取 直接调用的就是非公平模式  不阻塞
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}

//尝试获取 阻塞直至超时返回
public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

//释放 并释放一个位置
public void release() {
sync.releaseShared(1);
}

//可打断 获取permits位置  一直阻塞 直到获取成功
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}

//不可打断 获取permits位置   一直阻塞 直到获取成功
public void acquireUninterruptibly(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireShared(permits);
}

//尝试获取permits位置 不阻塞
public boolean tryAcquire(int permits) {
if (permits < 0) throw new IllegalArgumentException();
return sync.nonfairTryAcquireShared(permits) >= 0;
}

//尝试获取permits位置  阻塞直至时间过期
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}

//释放permits位置 且唤醒线程
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}

//将剩余的位置全部占用
public int drainPermits() {
return sync.drainPermits();
}

//释放reduction位置 但不去唤醒
protected void reducePermits(int reduction) {
if (reduction < 0) throw new IllegalArgumentException();
sync.reducePermits(reduction);
}

//释放公平模式
public boolean isFair() {
return sync instanceof FairSync;
}

//是否拥有等待的线程  其实就是去判断头部和尾部是否不相等
public final boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}

//获取等待的线程长度
public final int getQueueLength() {
return sync.getQueueLength();
}

//返回所有线程
protected Collection<Thread> getQueuedThreads() {
return sync.getQueuedThreads();
}

public String toString() {
return super.toString() + "[Permits = " + sync.getPermits() + "]";
}
}


简单例子 :

public class SemaphoreTest {
public static void main(String[] args) throws InterruptedException {
Semaphore semaphore = new Semaphore(5);
for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {

@Override
public void run() {
try {
semaphore.acquire();
System.out.println("线程" + Thread.currentThread().getName() + "执行中");
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
semaphore.release();
}
System.out.println("线程" + Thread.currentThread().getName() + "执行完毕");

}
}, "thread" + i).start();
}

}
}


总结:

“公平信号量”和”非公平信号量”的释放信号量的机制是一样的!不同的是它们获取信号量的机制:线程在尝试获取信号量许可时,对于公平信号量而言,如果当前线程不在队列的头部,则排队等候;而对于非公平信号量而言,无论当前线程是不是在队列的头部,它都会直接获取信号量。

Semaphore通过acquire和release方法来进行获取和释放,可以不停的进行操作,用于控制线程数。

Semaphore也用到了AbstractQueuedSynchronizer,那么 下一篇就来解析下著名的AQS
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  并发 semaphore