java 并发工具包 -信号量 Semaphore
2017-12-09 00:21
585 查看
简介:
Semaphore,信号量,用于控制同时访问的线程数据,如果达到阀值,那么只能等到某线程释放才能被其他线程继续访问。源码解析:
public class Semaphore implements java.io.Serializable { private static final long serialVersionUID = -3222578661600680210L; //和CountDownLatch 一样,定义内部类Sync继承AbstractQueuedSynchronizer ,相对而言,对比CountDownLatch 重写父类方法多一些 private final Sync sync; abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1192457210091910933L; //初始化计数器 Sync(int permits) { setState(permits); } //获取计数器 final int getPermits() { return getState(); } //非公平的获取 计数器-acquires 即消耗了acquires个位置 final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; //如果remining<0了的话 就不执行compareAndSetState了 并且返回remining if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } //计数器+releases 即增加了releases 个位置 并释放了头部线程 protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; //releases不能为负数 if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } } //计数器-reductions 即消耗了reductions个位置 final void reducePermits(int reductions) { for (;;) { int current = getState(); int next = current - reductions; //reductions不能为负数 if (next > current) // underflow throw new Error("Permit count underflow"); if (compareAndSetState(current, next)) return; } } //返回当前计数器值 并设置为0 即全部占有 final int drainPermits() { for (;;) { int current = getState(); if (current == 0 || compareAndSetState(current, 0)) return current; } } } /** *非公平模式 */ static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L; NonfairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } } /** * 公平模式 */ static final class FairSync extends Sync { private static final long serialVersionUID = 2014338818796000944L; FairSync(int permits) { super(permits); } //和非公平模式不一样的地方 protected int tryAcquireShared(int acquires) { for (;;) { // 如果当前线程不在头部节点并且头结点和尾节点不相等 返回-1 if (hasQueuedPredecessors()) return -1; //同公平模式 int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } } //初始化默认使用的非公平模式 public Semaphore(int permits) { sync = new NonfairSync(permits); } //指定模式 public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); } public void acquire() throws InterruptedException { //可打断 无位置则阻塞 sync.acquireSharedInterruptibly(1); } //不可打断 无位置则阻塞 public void acquireUninterruptibly() { sync.acquireShared(1); } //尝试获取 直接调用的就是非公平模式 不阻塞 public boolean tryAcquire() { return sync.nonfairTryAcquireShared(1) >= 0; } //尝试获取 阻塞直至超时返回 public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } //释放 并释放一个位置 public void release() { sync.releaseShared(1); } //可打断 获取permits位置 一直阻塞 直到获取成功 public void acquire(int permits) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); sync.acquireSharedInterruptibly(permits); } //不可打断 获取permits位置 一直阻塞 直到获取成功 public void acquireUninterruptibly(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.acquireShared(permits); } //尝试获取permits位置 不阻塞 public boolean tryAcquire(int permits) { if (permits < 0) throw new IllegalArgumentException(); return sync.nonfairTryAcquireShared(permits) >= 0; } //尝试获取permits位置 阻塞直至时间过期 public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout)); } //释放permits位置 且唤醒线程 public void release(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.releaseShared(permits); } //将剩余的位置全部占用 public int drainPermits() { return sync.drainPermits(); } //释放reduction位置 但不去唤醒 protected void reducePermits(int reduction) { if (reduction < 0) throw new IllegalArgumentException(); sync.reducePermits(reduction); } //释放公平模式 public boolean isFair() { return sync instanceof FairSync; } //是否拥有等待的线程 其实就是去判断头部和尾部是否不相等 public final boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } //获取等待的线程长度 public final int getQueueLength() { return sync.getQueueLength(); } //返回所有线程 protected Collection<Thread> getQueuedThreads() { return sync.getQueuedThreads(); } public String toString() { return super.toString() + "[Permits = " + sync.getPermits() + "]"; } }
简单例子 :
public class SemaphoreTest { public static void main(String[] args) throws InterruptedException { Semaphore semaphore = new Semaphore(5); for (int i = 0; i < 10; i++) { new Thread(new Runnable() { @Override public void run() { try { semaphore.acquire(); System.out.println("线程" + Thread.currentThread().getName() + "执行中"); Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { semaphore.release(); } System.out.println("线程" + Thread.currentThread().getName() + "执行完毕"); } }, "thread" + i).start(); } } }
总结:
“公平信号量”和”非公平信号量”的释放信号量的机制是一样的!不同的是它们获取信号量的机制:线程在尝试获取信号量许可时,对于公平信号量而言,如果当前线程不在队列的头部,则排队等候;而对于非公平信号量而言,无论当前线程是不是在队列的头部,它都会直接获取信号量。Semaphore通过acquire和release方法来进行获取和释放,可以不停的进行操作,用于控制线程数。
Semaphore也用到了AbstractQueuedSynchronizer,那么 下一篇就来解析下著名的AQS
相关文章推荐
- JAVA并发编程-障碍器CyclicBarrier,计数器CountDownLatch,信号量Semaphore
- 【Java并发编程】之二十三:并发新特性—信号量Semaphore(含代码)
- 体验 Java 并发 api,用不同方式实现信号量锁(Semaphore)(1)
- Java多线程/并发22、信号量Semaphore
- 体验 Java 并发 api,用不同方式实现信号量锁(Semaphore)(2)
- 体验 Java 并发 api,用不同方式实现信号量锁(Semaphore)(3)
- 体验 Java 并发 api,用不同方式实现信号量锁(Semaphore)(4)
- java并发之(4):Semaphore信号量、CounDownLatch计数锁存器和CyclicBarrier循环栅栏
- 体验 Java 并发 api,用不同方式实现信号量锁(Semaphore)(5)
- Java并发系列3--信号量Semaphore
- 体验 Java 并发 api,用不同方式实现信号量锁(Semaphore)(6)
- 转:【Java并发编程】之二十三:并发新特性—信号量Semaphore(含代码)
- 体验 Java 并发 api,用不同方式实现信号量锁(Semaphore)(7)
- Java并发学习笔记(15)信号量(Semaphore) 关卡((2)CyclicBarrier)
- 【Java并发编程】之二十三:并发新特性—信号量Semaphore(含代码)(r)
- Java并发包之Semaphore信号量
- java并发编程学习:用 Semaphore (信号量)控制并发资源
- java并发之Semaphore(计数信号量)
- Java并发-类库新组件 - Semaphore 理解:计数信号量
- JAVA并发信号量 Semaphore