Java并发编程之AQS
2015-07-22 10:20
666 查看
谈到并发,不得不谈
ReentrantLock;而谈到
ReentrantLock,不得不谈
AbstractQueuedSynchronizer(AQS)!
类如其名,抽象的队列式的同步器,
AQS定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,如常用的
ReentrantLock/Semaphore/CountDownLatch…
维护了一个
volatile int state(代表共享资源)和一个
FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)。这里
volatile是核心关键词,具体
volatile的语义,在此不述。
state的访问方式有三种:
它维护了一个
volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)。这里
volatile是核心关键词,具体
volatile的语义,在此不述。
state的访问方式有三种:
getState()
setState()
compareAndSetState()
AQS定义两种资源共享方式:
Exclusive(独占,只有一个线程能执行,如
ReentrantLock)和
Share(共享,多个线程可同时执行,如
Semaphore/CountDownLatch)。
不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),
AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:
isHeldExclusively():该线程是否正在独占资源。只有用到
condition才需要去实现它。
tryAcquire(int):独占方式。尝试获取资源,成功则返回
true,失败则返回
false。
tryRelease(int):独占方式。尝试释放资源,成功则返回
true,失败则返回
false。
tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回
true,否则返回
false。
以
ReentrantLock为例,
state初始化为0,表示未锁定状态。A线程
lock()时,会调用
tryAcquire()独占该锁并将
state+1。此后,其他线程再
tryAcquire()时就会失败,直到A线程
unlock()到
state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。
再以
CountDownLatch以例,任务分为N个子线程去执行,
state也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后
countDown()一次,
state会
CAS减1。等到所有子线程都执行完后(即
state=0),会
unpark()主调用线程,然后主调用线程就会从
await()函数返回,继续后余动作。
一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现
tryAcquire-tryRelease、
tryAcquireShared、
tryReleaseShared中的一种即可。但
AQS也支持自定义同步器同时实现独占和共享两种方式,如
ReentrantReadWriteLock。
此方法是独占模式下线程获取共享资源的顶层入口。如果获取到资源,线程直接返回,否则进入等待队列,直到获取到资源为止,且整个过程忽略中断的影响。这也正是
lock()的语义,当然不仅仅只限于
lock()。获取到资源后,线程就可以去执行其临界区代码了。下面是
acquire()的源码:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
函数流程如下:
tryAcquire()尝试直接去获取资源,如果成功则直接返回;
addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;
acquireQueued()使线程在等待队列中获取资源,一直获取到资源后才返回。如果在整个等待过程中被中断过,则返回
true,否则返回
false。
如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断
selfInterrupt(),将中断补上。
这时单凭这4个抽象的函数来看流程还有点朦胧,不要紧,看完接下来的分析后,你就会明白了。就像《大话西游》里唐僧说的:等你明白了舍生取义的道理,你自然会回来和我唱这首歌的。
此方法尝试去获取独占资源。如果获取成功,则直接返回
true,否则直接返回
false。这也正是
tryLock()的语义,还是那句话,当然不仅仅只限于
tryLock()。如下是
tryAcquire()的源码:
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
什么?直接throw异常?说好的功能呢?好吧,还记得概述里讲的AQS只是一个框架,具体资源的获取/释放方式交由自定义同步器去实现吗? 就是这里了!!!AQS这里只定义了一个接口,具体资源的获取交由自定义同步器去实现了(通过state的get/set/CAS)!!!至于能不能重入,能不能加塞(公平与非公平),那就看具体的自定义同步器怎么去设计了!!!当然,自定义同步器在进行资源访问时要考虑线程安全的影响。
这里之所以没有定义成
abstract,是因为独占模式下只用实现
tryAcquire-tryRelease,而共享模式下只用实现
tryAcquireShared-tryReleaseShared。如果都定义成abstract,那么每个模式也要去实现另一模式下的接口。说到底,Doug
Lea还是站在咱们开发者的角度,尽量减少不必要的工作量。
此方法用于将当前线程加入到等待队列的队尾,并返回当前线程所在的结点。
private Node addWaiter(Node mode) {
//以给定模式构造节点。mode有两种:EXCLUSIVE(独占)和SHATED(共享)
Node node = new Node(Thread.currentThread(), mode);
// 尝试快速方式直接放到队尾
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//上一步失败则通过enq入队
enq(node);
return node;
}
此方法用于将
node加入队尾。
private Node enq(final Node node) {
//CAS自旋,直到成功加入队尾
for (;;) {
Node t = tail;
if (t == null) { // 队列为空,创建一个空的标志结点作为head结点,并将tail也指向它
if (compareAndSetHead(new Node()))
tail = head;
} else {//正常流程,放入队尾
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
如果你看过
AtomicInteger.getAndIncrement()函数源码,那么相信你一眼便看出这段代码的精华。CAS自旋volatile变量,是一种很经典的用法。
相关文章推荐
- JAVA基础之数组
- 安装jdk出现问题:Error opening registry key'software\Javasoft\Java Runti
- 模板模式和策略模式的区别【转】
- 搭建Eclipse PHP开发环境
- GraphicsMagick+im4java 图片处理
- [MAC Eclipse] Eclipse for MAC 中文乱码的解决办法
- Struts2拦截器
- java 各类型之间转换总结
- springmvc分页
- JDK1.5中的多线程升级方案
- ofbiz myeclipse 配
- Java修饰符、抽象类、接口
- java函数--03
- Opentaps(OFBiz)在Eclipse下调试配置
- Android-PullToRefresh(一)在Eclipse中如何引入/Android-PullToRefresh和其Demo
- Java:使用synchronized和Lock对象获取对象锁
- 解决java web 乱码问题
- java.lang.NoSuchMethodException的错误
- ofbiz加载入eclipse
- java 转成字符串 json 数组和迭代