java并发编程实践笔记14
2017-01-19 00:41
405 查看
自定义Semaphore和ReentrantLock
主要实现利用jdk的Semaphore和ReentrantLock互相委托,实现自定义的Semaphore和ReentrantLock。使用ReentrantLock实现Semaphore
下面是自定义的Semaphore代码:/** * 自定义 * * 委托给 lock 和condition实现 * Created by Administrator on 2017/1/18 0018. */ public class DelegateSemaphore { private final Lock lock = new ReentrantLock(); private final Condition acquireCon = lock.newCondition(); //许可剩余个数 private int permit; public DelegateSemaphore(int permit) { this.permit = permit; } public void acquire() throws InterruptedException { lock.lock(); if (permit <= 0) { acquireCon.await(); } permit--; lock.unlock(); } public void release() { lock.lock(); permit++; acquireCon.signal(); lock.unlock(); } }
说明:
1. permit表示许可个数
2. lock和acquireCon实现 获取阻塞 和释放。
测试代码如下:
@Test public void testDelegateSemaphore() throws InterruptedException { DelegateSemaphore semaphore = new DelegateSemaphore(2); new Thread(){ @Override public void run() { try { semaphore.acquire(); System.out.println(System.currentTimeMillis()/1000+"获得一个许可"); semaphore.acquire(); System.out.println(System.currentTimeMillis()/1000+"获得一个许可"); semaphore.acquire(); System.out.println(System.currentTimeMillis()/1000+"获得一个许可"); } catch (InterruptedException e) { e.printStackTrace(); } } }.start(); TimeUnit.SECONDS.sleep(4); semaphore.release(); }
测试用例执行结果如下:
1484754353获得一个许可 1484754353获得一个许可 1484754357获得一个许可
测试结果说明:
1. 测试代码设置了2个许可,所以前面两次调用acquire,很快获得通过。但是第三次和第二次明显等待了4秒(打印时间说明了这个问题),因为4秒后,main函数执行了一次release。
使用Semaphore实现ReentrantLock
首先我们来看一个测试:@Test public void testReentrantLock() throws InterruptedException { ReentrantLock reentrantLock = new ReentrantLock(); reentrantLock.lock(); new Thread(){ @Override public void run() { try { reentrantLock.unlock(); System.out.println("子线程已经调用unlock"); } catch (Exception e) { e.printStackTrace(); } } }.start(); TimeUnit.SECONDS.sleep(2); System.out.println( "当前线程是否持有锁:"+reentrantLock.isLocked()); }
上面测试打印结果为:
java.lang.IllegalMonitorStateException at java.util.concurrent.locks.ReentrantLock$Sync.tryRelease(ReentrantLock.java:151) at java.util.concurrent.locks.AbstractQueuedSynchronizer.release(AbstractQueuedSynchronizer.java:1261) at java.util.concurrent.locks.ReentrantLock.unlock(ReentrantLock.java:457) at com.ly.examples.concurrency.AqsTest$4.run(AqsTest.java:86) 当前线程是否持有锁:true
说明:ReentrantLock 获得锁后,在另外一个线程调用unlock 是没用的,并且会抛出异常,抛出异常源代码如下:
protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
自定义的ReentrantLock代码如下:
/** * 自定义ReentrantLock 委托给 * Created by Administrator on 2017/1/18 0018. */ public class DelegateReentrantLock { private final Semaphore semaphore = new Semaphore(1); private int lockCount; private volatile Thread holdThread; public void lock() throws InterruptedException { if (holdThread == Thread.currentThread()) { lockCount++; } else { semaphore.acquire(); holdThread = Thread.currentThread(); lockCount++; } } public void unlock() { if (holdThread == Thread.currentThread()) { lockCount--; if (lockCount == 0) { holdThread = null; semaphore.release(); } } else { throw new IllegalMonitorStateException(); } } public boolean isLocked() { return Thread.currentThread() == holdThread; } }
测试和上面的testReentrantLock差不多,只是改为使用DelegateReentrantLock,执行结果如下:
java.lang.IllegalMonitorStateException at com.ly.examples.concurrency.DelegateReentrantLock.unlock(DelegateReentrantLock.java:34) at com.ly.examples.concurrency.AqsTest$5.run(AqsTest.java:106) 当前线程是否持有锁:true
所以说明写的自定义lock也可以正常使用。
分析下代码写法会不会有问题,因为我觉得自己定义的lockCount和holdThread变量都没有锁住之后再修改,总觉得有问题,这里简单分析如下:
1. 假设n个线程同时请求锁,lock中holdThread和lockCount的修改是在获取唯一许可之后,所以只会有一个线程修改。
2. 假设线程1已经获得锁,正在释放锁unlock,其他n个线程获取lock,线程1如果lockCount ==0,先设置了holdThread=null,其他n个线程会调用到 semaphore.acquire();请求获取,然后线程1 已经处理完更改,调用 semaphore.release();真正释放。
分析后发现lockCount和holdThread 要么是在lock方法中线程获取到唯一许可后,所以不存在竞争。要么是在unlock中,设置完成后调用semaphore.release(),换句话说改完了我再release,所以也不存在竞争.
用volatile修饰holdThread有什么好处呢?这个时候unlock设置了holdThread == null可以对其他线程立即可见,所以还是有好处的。
AbstractQueuedSynchronizer
同步类核心点,判断某个状态(条件)是否满足,满足准许执行,否则返回失败,或者进入等待队列。AQS在同步类中就充当了状态管理者和阻塞实行者。AZQS有一个关于状态信息的单一整数,可以通过getState,setState和CompareAndSetState等方法来操作。
如ReentrantLock 用state表示当前线程请求了多少次锁,Semaphore用它来表示剩余的许可数。FutureTask用它来表示任务的状态(尚未开始,运行,完成 和取消)。
AQS 获取和释放范式
boolean acquire() throws InterruptedException { while (state does not permit acquire){ if (blocking acquisition requested){ enqueue current thread if not already queued block current thread } else return failure } possibly update synchronization state dequeue thread if it was queued return success } void release() { update synchronization state if (new state may permit a blocked thread to acquire) unblock one or more queued threads }
支持独占获取的synchronizer,应该实现tryAcquire,tryRelease和isHeldExclusively。
支持独占获取的synchronizer,应该实现tryAcquireShared,tryReleaseShared和isHeldExclusively。
通过getState,setState,compareAndSetState来维护状态
一个简单的闭锁
/** * 类似于CountDownLatch 不过 允许个数是1 * <p> * Created by Administrator on 2017/1/18 0018. */ public class OneShotLatch { private final Sync sync = new Sync(); /** * 执行阻塞 */ public void await() { sync.acquireShared(0); } public void signal() { sync.releaseShared(0); } private class Sync extends AbstractQueuedSynchronizer { public Sync() { setState(1); } @Override protected int tryAcquireShared(int arg) { return getState() == 0 ? 1 : -1; } @Override protected boolean tryReleaseShared(int arg) { setState(0); return true; } } }
说明:
1. 这里我用state=1表示latch被关闭
2. 因为 AQS 实现 tryAcquireShared返回大于0整数表示获取成功,所以我这里判断state=0返回 1,表示获取成功。
3. tryReleaseShared直接设置state=0。表示激活,然后发挥true。
AQS 源码分析
类继承
AbstractOwnableSynchronizer (java.util.concurrent.locks) -- AbstractQueuedSynchronizer (java.util.concurrent.locks)
AbstractOwnableSynchronizer
AbstractOwnableSynchronizer很简答就一个属性:private transient Thread exclusiveOwnerThread;
表示该aqs属于哪个线程,并且是排他的线程。
state
/** * The synchronization state. */ private volatile int state;
state 维护者Synchronizer的状态 ,如前面说过的ReentrantLock 用state表示当前线程请求了多少次锁,Semaphore用它来表示剩余的许可数。
相关的几个状态维护的方法:
getState
compareAndSetState:通过jdk的Unsafe的cas操作直接的,原子性的改变state地址的值。
setState
使用说明
一般要实现自己的同步类,都是在内部定义一个内部工具类(继承AQS)。AQS没有定义 也没有实现任何同步接口。而是定义了一些方法(如acquireInterruptibly),它们可以被具体的锁或者同步类调用。
acquire相关方法
相关方法列表如下:前面两个acquire的方法是排他模式的顶层接口,一般实现排他模式的同步器,会调用这两个类。当然我们要实现自然是tryAcqure相关方法,一般直接调用tryAcqure较少,但是也有。比如如果我们实现tryLock就可以。
acquireShared方法自然是共享模式,和上面类似,我们实现的AQS子类要实现tryAcquireShared相关方法。
并且acquire和acquireShared 这些上层接口是final方法,不能被继覆盖的。
对于内部的队列提供了检查,工具和监控相关的方法,对于条件对象也提供了类似的方法。
序列化这个类 只有state 字段会被序列化,线程队列是没有被序列化的。一般子类可以实现readObject方法来恢复state.
AQS的 (排他模式)exclusive mode 和shared model(共享模式)说明
其他线程调用acquire*方法不能成功。其他线程调用acquireShared*方法不能可能成功,也可能失败(比如读写锁,如果写锁被其他线程获取了,我们都锁也是阻塞的,拿不到的)。
两种模式下的线程的等待队列是共享的同一个FIFO队列。
一般情况下,子类只实现两种模式下的一种模式的支持,但也有例外,如ReadWriteLock。
支持一个模式的另外的类不用实现。
waitStatus 有以下几种情况
/** waitStatus value to indicate thread has cancelled */ static final int CANCELLED = 1; /** waitStatus value to indicate successor's thread needs unparking */ static final int SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition */ static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ static final int PROPAGATE = -3;
初始化值为0。
分析一个 排他模式代码
@Test public void testReentrantLock2() throws InterruptedException { final ReentrantLock reentrantLock = new ReentrantLock(); reentrantLock.lock(); new Thread() { @Override public void run() { try { reentrantLock.lock(); System.out.println("当前子线程是否持有锁:" + reentrantLock.isLocked()); } catch (Exception e) { e.printStackTrace(); } } }.start(); TimeUnit.SECONDS.sleep(2); reentrantLock.unlock(); TimeUnit.SECONDS.sleep(5000); System.out.println("当前线程是否持有锁:" + reentrantLock.isLocked()); }
调用流程主要如下:
上图中等待队列以及node数据分析:
第一步中 :head 和tail 都是null。
第四步中:阻塞的时候,因为waitStatus = -1表示是一个信号节点。如果一个线程进入阻塞,必须设置前面的节点waitStatus=-1.并且waitStatus开始必须为 0或者 PROPAGATE(-3)。
第七步中:获得锁,设置head为子线程节点。
ps:waitStatus=-2表示当前的线程节点正在等待一个condition,后面再专门针对条件队列分析。
分析一个共享模式代码
@Test public void testSemaphore() throws InterruptedException { Semaphore semaphore = new Semaphore(1); new Thread() { @Override public void run() { try { semaphore.acquire(2); System.out.println("get the lock"); } catch (InterruptedException e) { e.printStackTrace(); } } }.start(); TimeUnit.SECONDS.sleep(60); System.out.println("main 调用release"); semaphore.release(2); TimeUnit.SECONDS.sleep(5000); }
说明:
其实和上面逻辑差不多,子线程拿不到足够许可,进入阻塞(这里有个不同是子节点的nextWaiter=SHARED),然后主线程释放2个所以总共有3个许可,并且激活头节点的下一个节点。子线程请求两个许可。关键不同在此处,调用的不是setHead而是setHeadAndPropagate:这个方法除了设置head,还会继续激活下一个节点,传递下去。
为什么说tryAcquireShared 返回>0整数表示获取成功,如下图:
Acquire: while (!tryAcquire(arg)) { <em>enqueue thread if it is not already queued</em>; <em>possibly block current thread</em>; }
因为返回>0 数表示true。
fair 和not fair
为什么已经存在FIFO队列,还会产生非公平呢?我们先看下面的伪代码 很好的说明了AQS的运行原理:
* Acquire: * while (!tryAcquire(arg)) { * <em>enqueue thread if it is not already queued</em>; * <em>possibly block current thread</em>; * } * * Release: * if (tryRelease(arg)) * <em>unblock the first queued thread</em>; * </pre> * * (Shared mode is similar but may involve cascading signals.)
通过上面的伪代码我们知道,acquire实现是先获取,没有获取到再入队列,这样正在获取的线程和队列的第一个线程会存在同时acquire情形,所以不一定按队列形式公平的得到锁了。
如果想要实现公平获取,我们可以在tryAcquire方法里面调用hasQueuedPredecessors判断是否队列已经有等待线程。如果有tryAcquire返回false。那当前线程就自觉去排队,变成公平获取了。
相关文章推荐
- java并发编程实践笔记
- java并发编程实践笔记
- java并发编程实践笔记
- Java并发编程实践笔记之——原子性(Atomicity)
- java并发编程实践笔记
- Java并发编程实践笔记之-什么是线程安全
- java并发编程实践笔记
- java并发编程实践笔记(转)
- Java并发编程实践笔记
- java并发编程实践 笔记(1)
- JAVA 并发编程实践 - 原子变量与非阻塞同步机制 笔记
- 编程实践笔记{Java 线程 并发处理 Webservice}
- java线程学习(二)—并发编程实践学习笔记
- java并发编程实践笔记
- java并发编程实践笔记
- Java并发编程实践笔记之—阻塞和中断方法(Blocking and Interruptible Methods)
- 编程实践笔记{Java 线程 并发处理 Webservice}(转)
- Java并发编程实践笔记之—线程封闭(Thread Confinement)
- JAVA并发编程实践笔记
- Java并发编程实践阅读笔记