您的位置:首页 > 编程语言 > Java开发

Jdk1.8版本Semaphore实现源码分析

2018-03-09 13:57 686 查看

一、Semaphore

      Semaphore,信号量,常用在限流的场景,一般用来构建一些对象池,资源池之类的,Semaphore用来管理一组许可,当没有许可时,线程阻塞等待其他线程用完后归还许可后阻塞的线程才能获取许可。Semaphore里面分为了公平和非公平模式,默认是非公平的。

二、看个例子

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

public class SemaphoreExample {

public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(8);
final Semaphore semaphore = new Semaphore(5);
final CountDownLatch countDownLatch = new CountDownLatch(1);
for (int i = 1; i <= 8; i++) {
final int index = i;
service.submit(new Runnable() {
@Override
public void run() {
try {
semaphore.acquire();
System.out.println("当前线程:" + Thread.currentThread().getName() + " 获得许可" );
//不释放锁,模拟业务操作
Thread.sleep(1000);
System.out.println("当前线程:" + Thread.currentThread().getName() + "释放许可");
//System.out.println("当前允许进入的最大任务数:" + semaphore.availablePermits());
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
} finally{
if(index == 8){
countDownLatch.countDown();
}
}

}
});
}
try {
System.out.println("main线程等待:" + Thread.currentThread().getName());
countDownLatch.await();
System.out.println("模拟执行完毕.....");
System.out.println("关闭线程池");
service.shutdown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}     可能的运行结果:
   
当前线程:pool-1-thread-1 获得许可
当前线程:pool-1-thread-4 获得许可
当前线程:pool-1-thread-3 获得许可
当前线程:pool-1-thread-5 获得许可
当前线程:pool-1-thread-2 获得许可
main线程等待:main
当前线程:pool-1-thread-1释放许可
当前线程:pool-1-thread-2释放许可
当前线程:pool-1-thread-3释放许可
当前线程:pool-1-thread-6 获得许可
当前线程:pool-1-thread-7 获得许可
当前线程:pool-1-thread-8 获得许可
当前线程:pool-1-thread-4释放许可
当前线程:pool-1-thread-5释放许可
当前线程:pool-1-thread-6释放许可
当前线程:pool-1-thread-8释放许可
模拟执行完毕.....
关闭线程池
当前线程:pool-1-thread-7释放许可
     整个实现比较简单,没许可就等待,否则执行任务处理即可。

三、源码分析

  
public class Semaphore implements java.io.Serializable {
private static final long serialVersionUID = -3222578661600680210L;
/** All mechanics via AbstractQueuedSynchronizer subclass */
private final Sync sync;

/**
* Synchronization implementation for semaphore.  Uses AQS state
* to represent permits. Subclassed into fair and nonfair
* versions.
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;

//Semaphore内部使用CAS里面的原子Int域来表示可用的信号量
Sync(int permits) {
setState(permits);
}
//获取可用的许可数

final int getPermits() {
return getState();
}

//非公平模式下获取许可证
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
//申请许可之前先要获取当前可用的许可数
int available = getState();
//当前可用的许可数--申请的许可 = 剩下的许可
int remaining = available - acquires;
//如果可以申请获取许可,那么CAS设置还可用的许可数
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

//释放许可

protected final boolean tryReleaseShared(int releases) {
//不停循环

f79e
for (;;) {
//获取当前可用的许可
int current = getState();
//既然是释放,那么就应该重新重置可用的许可数(原来的许可+本次释放的许可)
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
//CAS设置新的可用许可数
if (compareAndSetState(current, next))
return true;
}
}
//减少许可,也是通过CAS实现

final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}

//重置可用许可数,将其重置为0
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}

/**
* NonFair version
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;

NonfairSync(int permits) {
super(permits);
}

protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}

/**
* Fair version
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;

FairSync(int permits) {
super(permits);
}
//公平模式下获取许可

protected int tryAcquireShared(int acquires) {
for (;;) {
//既然是公平模式,那么在申请许可的时候需要先看一下当前队列里面还有没有比当前线程更早
//到达队列的线程在等待获取许可
if (hasQueuedPredecessors())
return -1;
//后面的处理和前面的一样
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}

/**
* Creates a {@code Semaphore} with the given number of
* permits and nonfair fairness setting.
*
* @param permits the initial number of permits available.
*        This value may be negative, in which case releases
*        must occur before any acquires will be granted.
*/
//默认是非公平版本
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}

/**
* Creates a {@code Semaphore} with the given number of
* permits and the given fairness setting.
*
* @param permits the initial number of permits available.
*        This value may be negative, in which case releases
*        must occur before any acquires will be granted.
* @param fair {@code true} if this semaphore will guarantee
*        first-in first-out granting of permits under contention,
*        else {@code false}
*/
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

/**
* Acquires a permit from this semaphore, blocking until one is
* available, or the thread is {@linkplain Thread#interrupt interrupted}.
*
* <p>Acquires a permit, if one is available and returns immediately,
* reducing the number of available permits by one.
*
* <p>If no permit is available then the current thread becomes
* disabled for thread scheduling purposes and lies dormant until
* one of two things happens:
* <ul>
* <li>Some other thread invokes the {@link #release} method for this
* semaphore and the current thread is next to be assigned a permit; or
* <li>Some other thread {@linkplain Thread#interrupt interrupts}
* the current thread.
* </ul>
*
* <p>If the current thread:
* <ul>
* <li>has its interrupted status set on entry to this method; or
* <li>is {@linkplain Thread#interrupt interrupted} while waiting
* for a permit,
* </ul>
* then {@link InterruptedException} is thrown and the current thread's
* interrupted status is cleared.
*
* @throws InterruptedException if the current thread is interrupted
*/
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

/**
* Acquires a permit from this semaphore, blocking until one is
* available.
*
* <p>Acquires a permit, if one is available and returns immediately,
* reducing the number of available permits by one.
*
* <p>If no permit is available then the current thread becomes
* disabled for thread scheduling purposes and lies dormant until
* some other thread invokes the {@link #release} method for this
* semaphore and the current thread is next to be assigned a permit.
*
* <p>If the current thread is {@linkplain Thread#interrupt interrupted}
* while waiting for a permit then it will continue to wait, but the
* time at which the thread is assigned a permit may change compared to
* the time it would have received the permit had no interruption
* occurred.  When the thread does return from this method its interrupt
* status will be set.
*/
//带线程中断的,获取许可
public void acquireUninterruptibly() {
sync.acquireShared(1);
}

/**
* Acquires a permit from this semaphore, only if one is available at the
* time of invocation.
*
* <p>Acquires a permit, if one is available and returns immediately,
* with the value {@code true},
* reducing the number of available permits by one.
*
* <p>If no permit is available then this method will return
* immediately with the value {@code false}.
*
* <p>Even when this semaphore has been set to use a
* fair ordering policy, a call to {@code tryAcquire()} <em>will</em>
* immediately acquire a permit if one is available, whether or not
* other threads are currently waiting.
* This "barging" behavior can be useful in certain
* circumstances, even though it breaks fairness. If you want to honor
* the fairness setting, then use
* {@link #tryAcquire(long, TimeUnit) tryAcquire(0, TimeUnit.SECONDS) }
* which is almost equivalent (it also detects interruption).
*
* @return {@code true} if a permit was acquired and {@code false}
*         otherwise
*/
//不带线程中断的 非公平版本获取许可
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}

/**
* Acquires a permit from this semaphore, if one becomes available
* within the given waiting time and the current thread has not
* been {@linkplain Thread#interrupt interrupted}.
*
* <p>Acquires a permit, if one is available and returns immediately,
* with the value {@code true},
* reducing the number of available permits by one.
*
* <p>If no permit is available then the current thread becomes
* disabled for thread scheduling purposes and lies dormant until
* one of three things happens:
* <ul>
* <li>Some other thread invokes the {@link #release} method for this
* semaphore and the current thread is next to be assigned a permit; or
* <li>Some other thread {@linkplain Thread#interrupt interrupts}
* the current thread; or
* <li>The specified waiting time elapses.
* </ul>
*
* <p>If a permit is acquired then the value {@code true} is returned.
*
* <p>If the current thread:
* <ul>
* <li>has its interrupted status set on entry to this method; or
* <li>is {@linkplain Thread#interrupt interrupted} while waiting
* to acquire a permit,
* </ul>
* then {@link InterruptedException} is thrown and the current thread's
* interrupted status is cleared.
*
* <p>If the specified waiting time elapses then the value {@code false}
* is returned.  If the time is less than or equal to zero, the method
* will not wait at all.
*
* @param timeout the maximum time to wait for a permit
* @param unit the time unit of the {@code timeout} argument
* @return {@code true} if a permit was acquired and {@code false}
*         if the waiting time elapsed before a permit was acquired
* @throws InterruptedException if the current thread is interrupted
*/
//带超时时间的获取许可
public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

/**
* Releases a permit, returning it to the semaphore.
*
* <p>Releases a permit, increasing the number of available permits by
* one.  If any threads are trying to acquire a permit, then one is
* selected and given the permit that was just released.  That thread
* is (re)enabled for thread scheduling purposes.
*
* <p>There is no requirement that a thread that releases a permit must
* have acquired that permit by calling {@link #acquire}.
* Correct usage of a semaphore is established by programming convention
* in the application.
*/
//释放许可
public void release() {
sync.releaseShared(1);
}

/**
* Acquires the given number of permits from this semaphore,
* blocking until all are available,
* or the thread is {@linkplain Thread#interrupt interrupted}.
*
* <p>Acquires the given number of permits, if they are available,
* and returns immediately, reducing the number of available permits
* by the given amount.
*
* <p>If insufficient permits are available then the current thread becomes
* disabled for thread scheduling purposes and lies dormant until
* one of two things happens:
* <ul>
* <li>Some other thread invokes one of the {@link #release() release}
* methods for this semaphore, the current thread is next to be assigned
* permits and the number of available permits satisfies this request; or
* <li>Some other thread {@linkplain Thread#interrupt interrupts}
* the current thread.
* </ul>
*
* <p>If the current thread:
* <ul>
* <li>has its interrupted status set on entry to this method; or
* <li>is {@linkplain Thread#interrupt interrupted} while waiting
* for a permit,
* </ul>
* then {@link InterruptedException} is thrown and the current thread's
* interrupted status is cleared.
* Any permits that were to be assigned to this thread are instead
* assigned to other threads trying to acquire permits, as if
* permits had been made available by a call to {@link #release()}.
*
* @param permits the number of permits to acquire
* @throws InterruptedException if the current thread is interrupted
* @throws IllegalArgumentException if {@code permits} is negative
*/
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}

/**
* Acquires the given number of permits from this semaphore,
* blocking until all are available.
*
* <p>Acquires the given number of permits, if they are available,
* and returns immediately, reducing the number of available permits
* by the given amount.
*
* <p>If insufficient permits are available then the current thread becomes
* disabled for thread scheduling purposes and lies dormant until
* some other thread invokes one of the {@link #release() release}
* methods for this semaphore, the current thread is next to be assigned
* permits and the number of available permits satisfies this request.
*
* <p>If the current thread is {@linkplain Thread#interrupt interrupted}
* while waiting for permits then it will continue to wait and its
* position in the queue is not affected.  When the thread does return
* from this method its interrupt status will be set.
*
* @param permits the number of permits to acquire
* @throws IllegalArgumentException if {@code permits} is negative
*/
public void acquireUninterruptibly(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireShared(permits);
}

/**
* Acquires the given number of permits from this semaphore, only
* if all are available at the time of invocation.
*
* <p>Acquires the given number of permits, if they are available, and
* returns immediately, with the value {@code true},
* reducing the number of available permits by the given amount.
*
* <p>If insufficient permits are available then this method will return
* immediately with the value {@code false} and the number of available
* permits is unchanged.
*
* <p>Even when this semaphore has been set to use a fair ordering
* policy, a call to {@code tryAcquire} <em>will</em>
* immediately acquire a permit if one is available, whether or
* not other threads are currently waiting.  This
* "barging" behavior can be useful in certain
* circumstances, even though it breaks fairness. If you want to
* honor the fairness setting, then use {@link #tryAcquire(int,
* long, TimeUnit) tryAcquire(permits, 0, TimeUnit.SECONDS) }
* which is almost equivalent (it also detects interruption).
*
* @param permits the number of permits to acquire
* @return {@code true} if the permits were acquired and
*         {@code false} otherwise
* @throws IllegalArgumentException if {@code permits} is negative
*/
public boolean tryAcquire(int permits) {
if (permits < 0) throw new IllegalArgumentException();
return sync.nonfairTryAcquireShared(permits) >= 0;
}

/**
* Acquires the given number of permits from this semaphore, if all
* become available within the given waiting time and the current
* thread has not been {@linkplain Thread#interrupt interrupted}.
*
* <p>Acquires the given number of permits, if they are available and
* returns immediately, with the value {@code true},
* reducing the number of available permits by the given amount.
*
* <p>If insufficient permits are available then
* the current thread becomes disabled for thread scheduling
* purposes and lies dormant until one of three things happens:
* <ul>
* <li>Some other thread invokes one of the {@link #release() release}
* methods for this semaphore, the current thread is next to be assigned
* permits and the number of available permits satisfies this request; or
* <li>Some other thread {@linkplain Thread#interrupt interrupts}
* the current thread; or
* <li>The specified waiting time elapses.
* </ul>
*
* <p>If the permits are acquired then the value {@code true} is returned.
*
* <p>If the current thread:
* <ul>
* <li>has its interrupted status set on entry to this method; or
* <li>is {@linkplain Thread#interrupt interrupted} while waiting
* to acquire the permits,
* </ul>
* then {@link InterruptedException} is thrown and the current thread's
* interrupted status is cleared.
* Any permits that were to be assigned to this thread, are instead
* assigned to other threads trying to acquire permits, as if
* the permits had been made available by a call to {@link #release()}.
*
* <p>If the specified waiting time elapses then the value {@code false}
* is returned.  If the time is less than or equal to zero, the method
* will not wait at all.  Any permits that were to be assigned to this
* thread, are instead assigned to other threads trying to acquire
* permits, as if the permits had been made available by a call to
* {@link #release()}.
*
* @param permits the number of permits to acquire
* @param timeout the maximum time to wait for the permits
* @param unit the time unit of the {@code timeout} argument
* @return {@code true} if all permits were acquired and {@code false}
*         if the waiting time elapsed before all permits were acquired
* @throws InterruptedException if the current thread is interrupted
* @throws IllegalArgumentException if {@code permits} is negative
*/
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}

/**
* Releases the given number of permits, returning them to the semaphore.
*
* <p>Releases the given number of permits, increasing the number of
* available permits by that amount.
* If any threads are trying to acquire permits, then one
* is selected and given the permits that were just released.
* If the number of available permits satisfies that thread's request
* then that thread is (re)enabled for thread scheduling purposes;
* otherwise the thread will wait until sufficient permits are available.
* If there are still permits available
* after this thread's request has been satisfied, then those permits
* are assigned in turn to other threads trying to acquire permits.
*
* <p>There is no requirement that a thread that releases a permit must
* have acquired that permit by calling {@link Semaphore#acquire acquire}.
* Correct usage of a semaphore is established by programming convention
* in the application.
*
* @param permits the number of permits to release
* @throws IllegalArgumentException if {@code permits} is negative
*/
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}

/**
* Returns the current number of permits available in this semaphore.
*
* <p>This method is typically used for debugging and testing purposes.
*
* @return the number of permits available in this semaphore
*/
public int availablePermits() {
return sync.getPermits();
}

/**
* Acquires and returns all permits that are immediately available.
*
* @return the number of permits acquired
*/
public int drainPermits() {
return sync.drainPermits();
}

/**
* Shrinks the number of available permits by the indicated
* reduction. This method can be useful in subclasses that use
* semaphores to track resources that become unavailable. This
* method differs from {@code acquire} in that it does not block
* waiting for permits to become available.
*
* @param reduction the number of permits to remove
* @throws IllegalArgumentException if {@code reduction} is negative
*/
protected void reducePermits(int reduction) {
if (reduction < 0) throw new IllegalArgumentException();
sync.reducePermits(reduction);
}

/**
* Returns {@code true} if this semaphore has fairness set true.
*
* @return {@code true} if this semaphore has fairness set true
*/
public boolean isFair() {
return sync instanceof FairSync;
}

/**
* Queries whether any threads are waiting to acquire. Note that
* because cancellations may occur at any time, a {@code true}
* return does not guarantee that any other thread will ever
* acquire.  This method is designed primarily for use in
* monitoring of the system state.
*
* @return {@code true} if there may be other threads waiting to
*         acquire the lock
*/
public final boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}

/**
* Returns an estimate of the number of threads waiting to acquire.
* The value is only an estimate because the number of threads may
* change dynamically while this method traverses internal data
* structures.  This method is designed for use in monitoring of the
* system state, not for synchronization control.
*
* @return the estimated number of threads waiting for this lock
*/
public final int getQueueLength() {
return sync.getQueueLength();
}

/**
* Returns a collection containing threads that may be waiting to acquire.
* Because the actual set of threads may change dynamically while
* constructing this result, the returned collection is only a best-effort
* estimate.  The elements of the returned collection are in no particular
* order.  This method is designed to facilitate construction of
* subclasses that provide more extensive monitoring facilities.
*
* @return the collection of threads
*/
protected Collection<Thread> getQueuedThreads() {
return sync.getQueuedThreads();
}

/**
* Returns a string identifying this semaphore, as well as its state.
* The state, in brackets, includes the String {@code "Permits ="}
* followed by the number of permits.
*
* @return a string identifying this semaphore, as well as its state
*/
public String toString() {
return super.toString() + "[Permits = " + sync.getPermits() + "]";
}
}
 
      整体使用比较简单,源码也很清晰,就不多说了。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: