您的位置:首页 > 编程语言 > Java开发

Java并发编程之AQS

2015-07-22 10:20 666 查看


谈到并发,不得不谈
ReentrantLock
;而谈到
ReentrantLock
,不得不谈
AbstractQueuedSynchronizer(AQS)
! 
类如其名,抽象的队列式的同步器,
AQS
定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,如常用的
ReentrantLock/Semaphore/CountDownLatch






维护了一个
volatile int state
(代表共享资源)和一个
FIFO
线程等待队列(多线程争用资源被阻塞时会进入此队列)。这里
volatile
是核心关键词,具体
volatile
的语义,在此不述。
state
的访问方式有三种: 

它维护了一个
volatile int state
(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)。这里
volatile
是核心关键词,具体
volatile
的语义,在此不述。
state
的访问方式有三种:
getState()

setState()

compareAndSetState()


AQS
定义两种资源共享方式:
Exclusive
(独占,只有一个线程能执行,如
ReentrantLock
)和
Share
(共享,多个线程可同时执行,如
Semaphore/CountDownLatch)


不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),
AQS
已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:
isHeldExclusively()
:该线程是否正在独占资源。只有用到
condition
才需要去实现它。
tryAcquire(int)
:独占方式。尝试获取资源,成功则返回
true
,失败则返回
false

tryRelease(int)
:独占方式。尝试释放资源,成功则返回
true
,失败则返回
false

tryAcquireShared(int)
:共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
tryReleaseShared(int)
:共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回
true
,否则返回
false


ReentrantLock
为例,
state
初始化为0,表示未锁定状态。A线程
lock()
时,会调用
tryAcquire()
独占该锁并将
state+1
。此后,其他线程再
tryAcquire()
时就会失败,直到A线程
unlock()
state=0
(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。

再以
CountDownLatch
以例,任务分为N个子线程去执行,
state
也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后
countDown()
一次,
state
CAS
减1。等到所有子线程都执行完后(即
state=0
),会
unpark()
主调用线程,然后主调用线程就会从
await()
函数返回,继续后余动作。

一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现
tryAcquire-tryRelease
tryAcquireShared
tryReleaseShared
中的一种即可。但
AQS
也支持自定义同步器同时实现独占和共享两种方式,如
ReentrantReadWriteLock






此方法是独占模式下线程获取共享资源的顶层入口。如果获取到资源,线程直接返回,否则进入等待队列,直到获取到资源为止,且整个过程忽略中断的影响。这也正是
lock()
的语义,当然不仅仅只限于
lock()
。获取到资源后,线程就可以去执行其临界区代码了。下面是
acquire()
的源码:

public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

函数流程如下:
tryAcquire()
尝试直接去获取资源,如果成功则直接返回;
addWaiter()
将该线程加入等待队列的尾部,并标记为独占模式;
acquireQueued()
使线程在等待队列中获取资源,一直获取到资源后才返回。如果在整个等待过程中被中断过,则返回
true
,否则返回
false


如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断
selfInterrupt()
,将中断补上。 

这时单凭这4个抽象的函数来看流程还有点朦胧,不要紧,看完接下来的分析后,你就会明白了。就像《大话西游》里唐僧说的:等你明白了舍生取义的道理,你自然会回来和我唱这首歌的。



此方法尝试去获取独占资源。如果获取成功,则直接返回
true
,否则直接返回
false
。这也正是
tryLock()
的语义,还是那句话,当然不仅仅只限于
tryLock()
。如下是
tryAcquire()
的源码:

protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}

什么?直接throw异常?说好的功能呢?好吧,还记得概述里讲的AQS只是一个框架,具体资源的获取/释放方式交由自定义同步器去实现吗? 就是这里了!!!AQS这里只定义了一个接口,具体资源的获取交由自定义同步器去实现了(通过state的get/set/CAS)!!!至于能不能重入,能不能加塞(公平与非公平),那就看具体的自定义同步器怎么去设计了!!!当然,自定义同步器在进行资源访问时要考虑线程安全的影响。

这里之所以没有定义成
abstract
,是因为独占模式下只用实现
tryAcquire-tryRelease
,而共享模式下只用实现
tryAcquireShared-tryReleaseShared
。如果都定义成abstract,那么每个模式也要去实现另一模式下的接口。说到底,Doug
Lea还是站在咱们开发者的角度,尽量减少不必要的工作量。



此方法用于将当前线程加入到等待队列的队尾,并返回当前线程所在的结点。

private Node addWaiter(Node mode) {
//以给定模式构造节点。mode有两种:EXCLUSIVE(独占)和SHATED(共享)
Node node = new Node(Thread.currentThread(), mode);
// 尝试快速方式直接放到队尾
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//上一步失败则通过enq入队
enq(node);
return node;
}



此方法用于将
node
加入队尾。

private Node enq(final Node node) {
//CAS自旋,直到成功加入队尾
for (;;) {
Node t = tail;
if (t == null) { // 队列为空,创建一个空的标志结点作为head结点,并将tail也指向它
if (compareAndSetHead(new Node()))
tail = head;
} else {//正常流程,放入队尾
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

如果你看过
AtomicInteger.getAndIncrement()
函数源码,那么相信你一眼便看出这段代码的精华。CAS自旋volatile变量,是一种很经典的用法。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: