您的位置:首页 > 产品设计 > UI/UE

AbstractQueuedSynchronizer详解

2018-03-19 14:30 162 查看
AQS是自定义同步组件的关键,它的关注点是多线程访问共享资源受限时的线程处理策略,其中包括线程如何安全进入同步/等待队列,什么时候唤醒线程,需要取消线程时如何操作等等,而这些操作具有公共性,在所有的并发场景下都适用。而如何定义共享资源访问受限则是不同场景有不同场景的定义。

AQS的功能分为独占式线程处理策略和共享式线程处理策略,即其子类可以根据具体场景来选择使用不同的功能。

AQS包含一个内部类Node,这个内部类作为FIFO队列的节点类,因为AQS是对线程进程处理,自然Node内部类Thread类型的成员thread,表示这个节点所代表的线程。AQS维护的队列是一个双向队列。

Node {
thread     : Thread  表示当前节点所代表的线程
prev       : Node    指向前一个节点
next       : Node    指向后一个节点
waitStatus : int     节点状态
}


AQS使用一个同步状态
state
对线程是否可以访问资源进行指示,同步状态使用
volatile
关键字修饰使得线程在获取同步状态时可以取到最新值。这里的同步状态的含义就是我们通常意义上说的锁。

private volatile int state;


AQS的
acquire
方法表示获取同步状态,即获取锁。

public final void acquire(int acqu) {
if (!tryAcquire(acqu) && acquireQueued(addWaiter(Node.EXCLUSIVE), acqu))
selfInterrupt();
}


由于AQS的关注点在于资源访问受限时线程的处理,而资源访问受限的定义则交由其子类去实现,所以
tryAcquire
方法需要自行实现。另外4个需要自行实现的方法分别为
tryRelease/tryAcquireShared/tryReleaseShared/isHeldExclusively
。其中
tryAcquire/tryRelease
表示独占式地获取与释放同步状态,
tryAcquireShared/tryReleaseShared
表示共享式地获取与释放同步状态。之所以这些方法没有定义为抽象方法强迫子类去实现,原因是考虑到子类只使用了AQS的独占或共享功能,从而不需要对另一个功能进行实现。

public boolean tryAcquire(int acqu) {
throw new UnsupportedOperationException();
}
public boolean tryRelease(int acqu) {
throw new UnsupportedOperationException();
}
public boolean tryAcquireShared(int acqu) {
throw new UnsupportedOperationException();
}
public boolean tryReleaseShared(int acqu) {
throw new UnsupportedOperationException();
}
public boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}


以下面这个互斥锁的实现为例:

public class Mutex implements Lock {

private static final class Sync extends AbstractQueuedSynchronizer {

public boolean tryAcquire(int acqu) {
assert acqu == 1;
if (compareAndSetState(0, acqu)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

public boolean tryRelease(int acqu) {
assert acqu == 1;
if (getState() == 0) throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null); //因为所有未持有锁的线程都在同步队列里面,所以这里修改同步状态
setState(0);                   //无需使用CAS操作
return true;
}

public boolean isHeldExclusively() {
return getState() == 1;
}
}

private static final Sync sync = new Sync();
public void lock() {sync.acquire(1);} //将操作代理到sync上即可
public void unlock() {sync.release(1);}
public Condition newCondition() {return new ConditionObject();}
public void lockInterrupibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock() {return sync.tryAcquire(1);}
public boolean tryLock(long timeout, TimeUnit nuit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
}


加锁的操作实际调用的是
acquire
方法,根据AQS中
acquire
的定义,该方法首先调用
tryAcquire
方法,
tryAcquire
方法由我们自行实现,且此方法表达的含义是如何定义资源访问受限,即什么时候可以访问资源(获取到锁的时候,具体实现就是同步状态
state
为0的时候)。因此在上面的
tryAcquire
方法中通过CAS操作确保
state
为0时返回true,表示获取锁成功。当
state
为1时,
tryAcquire
返回false,这时执行
acquireQueued(addWaiter(Node.EXCLUSIVE), acqu)
操作,而此操作正是对未获取到锁的线程的操作,这是AQS的关注点,这些已经由大师Doug Lea帮我们实现了。

查看
addWaiter
的定义,此方法将开辟一个新的节点,用于保存未获取到锁的线程。通过自旋的方式(死循环/忙循环)将当前节点设置为尾节点。

private Node addWaiter(Node mode) {
Node node = new Node(mode);
for (;;) {
Node oldTail = tail;
if (oldTail != null) {
node.setPrevRelaxed(oldTail); //使用 VarHandle 以原子操作将当前节点的prev指向tail
if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;
return node
}
} else { // 若空的同步队列则初始化
initializeSyncQueue();
}
}
}


private final void initializeSyncQueue() {
Node h;
if (HEAD.compareAndSet(this, null, (h = new Node()))) // 头节点是一个空节点,表示当前有线程在持有锁
tail = h;
}


将未获得锁的线程加到同步队列后,需要再判断该线程是否需要自旋或者是挂起,通过
acquireQueued
方法。

final boolean acuqireQueued(final Node node, int arg) {
for (;;) {

}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  java并发