您的位置:首页 > 产品设计 > UI/UE

AbstractQueuedSynchronizer原理分析

2017-03-18 14:55 363 查看


简介

提供了一个基于FIFO队列,可以用于构建锁或者其他相关同步装置的基础框架。该同步器(以下简称同步器)利用了一个int来表示状态,期望它能够成为实现大部分同步需求的基础。使用的方法是继承,子类通过继承同步器并需要实现它的方法来管理其状态,管理的方式就是通过类似acquire和release的方式来操纵状态。然而多线程环境中对状态的操纵必须确保原子性,因此子类对于状态的把握,需要使用这个同步器提供的以下三个方法对状态进行操作:

java.util.concurrent.locks.AbstractQueuedSynchronizer.getState()
java.util.concurrent.locks.AbstractQueuedSynchronizer.setState(int)
java.util.concurrent.locks.AbstractQueuedSynchronizer.compareAndSetState(int,int)

子类推荐被定义为自定义同步装置的内部类,同步器自身没有实现任何同步接口,它仅仅是定义了若干acquire之类的方法来供使用。该同步器即可以作为排他模式也可以作为共享模式,当它被定义为一个排他模式时,其他线程对其的获取就被阻止,而共享模式对于多个线程获取都可以成功。

同步器是实现锁的关键,利用同步器将锁的语义实现,然后在锁的实现中聚合同步器。可以这样理解:锁的API是面向使用者的,它定义了与锁交互的公共行为,而每个锁需要完成特定的操作也是透过这些行为来完成的(比如:可以允许两个线程进行加锁,排除两个以上的线程),但是实现是依托给同步器来完成;同步器面向的是线程访问和资源控制,它定义了线程对资源是否能够获取以及线程的排队等操作。锁和同步器很好的隔离了二者所需要关注的领域,严格意义上讲,同步器可以适用于除了锁以外的其他同步设施上(包括锁)。

同步器的开始提到了其实现依赖于一个FIFO队列,那么队列中的元素Node就是保存着线程引用和线程状态的容器,每个线程对同步器的访问,都可以看做是队列中的一个节点。Node的主要包含以下成员变量:

1
Node
{
2
int
waitStatus;
3
Node
prev;
4
Node
next;
5
Node
nextWaiter;
6
Thread
thread;
7
}
以上五个成员变量主要负责保存该节点的线程引用,同步等待队列(以下简称sync队列)的前驱和后继节点,同时也包括了同步状态。
属性名称描述
intwaitStatus表示节点的状态。其中包含的状态有:

CANCELLED,值为1,表示当前的线程被取消;
SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark;
CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中;
PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行;
值为0,表示当前节点在sync队列中,等待着获取锁。

Nodeprev前驱节点,比如当前节点被取消,那就需要前驱节点和后继节点来完成连接。
Nodenext后继节点。
NodenextWaiter存储condition队列中的后继节点。
Threadthread入队列时的当前线程。
节点成为sync队列和condition队列构建的基础,在同步器中就包含了sync队列。同步器拥有三个成员变量:sync队列的头结点head、sync队列的尾节点tail和状态state。对于锁的获取,请求形成节点,将其挂载在尾部,而锁资源的转移(释放再获取)是从头部开始向后进行。对于同步器维护的状态state,多个线程对其的获取将会产生一个链式的结构。




API说明

实现自定义同步器时,需要使用同步器提供的getState()、setState()和compareAndSetState()方法来操纵状态的变迁。
方法名称描述
protectedbooleantryAcquire(intarg)排它的获取这个状态。这个方法的实现需要查询当前状态是否允许获取,然后再进行获取(使用compareAndSetState来做)状态。
protectedbooleantryRelease(intarg)释放状态。
protectedinttryAcquireShared(intarg)共享的模式下获取状态。
protectedbooleantryReleaseShared(intarg)共享的模式下释放状态。
protectedbooleanisHeldExclusively()在排它模式下,状态是否被占用。
实现这些方法必须是非阻塞而且是线程安全的,推荐使用该同步器的父类java.util.concurrent.locks.AbstractOwnableSynchronizer来设置当前的线程。

开始提到同步器内部基于一个FIFO队列,对于一个独占锁的获取和释放有以下伪码可以表示。

获取一个排他锁。

01
while
(获取锁)
{
02
if
(获取到)
{
03
退出
while
循环
04
}
else
{
05
if
(当前线程没有入队列)
{
06
那么入队列
07
}
08
阻塞当前线程
09
}
10
}
释放一个排他锁。

1
if
(释放成功)
{
2
删除头结点
3
激活原头结点的后继节点
4
}


示例

下面通过一个排它锁的例子来深入理解一下同步器的工作原理,而只有掌握同步器的工作原理才能够更加深入了解其他的并发组件。

排他锁的实现,一次只能一个线程获取到锁。

01
class
Mutex
implements
Lock,
java.io.Serializable{
02
//
内部类,自定义同步器
03
private
static
class
Sync
extends
AbstractQueuedSynchronizer
{
04
//
是否处于占用状态
05
protected
boolean
isHeldExclusively()
{
06
return
getState()
==
1
;
07
}
08
//
当状态为0的时候获取锁
09
public
boolean
tryAcquire(
int
acquires)
{
10
assert
acquires
==
1
;
//
Otherwiseunused
11
if
(compareAndSetState(
0
,
1
))
{
12
setExclusiveOwnerThread(Thread.currentThread());
13
return
true
;
14
}
15
return
false
;
16
}
17
//
释放锁,将状态设置为0
18
protected
boolean
tryRelease(
int
releases)
{
19
assert
releases
==
1
;
//
Otherwiseunused
20
if
(getState()
==
0
)
throw
new
IllegalMonitorStateException();
21
setExclusiveOwnerThread(
null
);
22
setState(
0
);
23
return
true
;
24
}
25
//
返回一个Condition,每个condition都包含了一个condition队列
26
Condition
newCondition(){
return
new
ConditionObject();
}
27
}
28
//
仅需要将操作代理到Sync上即可
29
private
final
Sync
sync=
new
Sync();
30
public
void
lock()
{sync.acquire(
1
);
}
31
public
boolean
tryLock()
{
return
sync.tryAcquire(
1
);
}
32
public
void
unlock()
{sync.release(
1
);
}
33
public
Condition
newCondition(){
return
sync.newCondition();
}
34
public
boolean
isLocked()
{
return
sync.isHeldExclusively();
}
35
public
boolean
hasQueuedThreads()
{
return
sync.hasQueuedThreads();
}
36
public
void
lockInterruptibly()
throws
InterruptedException
{
37
sync.acquireInterruptibly(
1
);
38
}
39
public
boolean
tryLock(
long
timeout,
TimeUnitunit)
40
throws
InterruptedException
{
41
return
sync.tryAcquireNanos(
1
,
unit.toNanos(timeout));
42
}
43
}
可以看到Mutex将Lock接口均代理给了同步器的实现。

使用方将Mutex构造出来之后,调用lock获取锁,调用unlock进行解锁。下面以Mutex为例子,详细分析以下同步器的实现逻辑。


实现分析


publicfinalvoidacquire(intarg)

该方法以排他的方式获取锁,对中断不敏感,完成synchronized语义。

1
public
final
void
acquire(
int
arg)
{
2
if
(!tryAcquire(arg)
&&
3
acquireQueued(addWaiter(Node.EXCLUSIVE),
arg))
4
selfInterrupt();
5
}
上述逻辑主要包括:

1.尝试获取(调用tryAcquire更改状态,需要保证原子性);

在tryAcquire方法中使用了同步器提供的对state操作的方法,利用compareAndSet保证只有一个线程能够对状态进行成功修改,而没有成功修改的线程将进入sync队列排队。

2.如果获取不到,将当前线程构造成节点Node并加入sync队列;

进入队列的每个线程都是一个节点Node,从而形成了一个双向队列,类似CLH队列,这样做的目的是线程间的通信会被限制在较小规模(也就是两个节点左右)。

3.再次尝试获取,如果没有获取到那么将当前线程从线程调度器上摘下,进入等待状态。

使用LockSupport将当前线程unpark,关于LockSupport后续会详细介绍。

01
private
Node
addWaiter(Nodemode){
02
Node
node=
new
Node(Thread.currentThread(),
mode);
03
//
快速尝试在尾部添加
04
Node
pred=tail;
05
if
(pred
!=
null
)
{
06
node.prev
=pred;
07
if
(compareAndSetTail(pred,
node)){
08
pred.next
=node;
09
return
node;
10
}
11
}
12
enq(node);
13
return
node;
14
}
15
16
private
Node
enq(
final
Node
node){
17
for
(;;)
{
18
Node
t=tail;
19
if
(t
==
null
)
{
//
Mustinitialize
20
if
(compareAndSetHead(
new
Node()))
21
tail
=head;
22
}
else
{
23
node.prev
=t;
24
if
(compareAndSetTail(t,
node)){
25
t.next
=node;
26
return
t;
27
}
28
}
29
}
上述逻辑主要包括:

1.使用当前线程构造Node;

对于一个节点需要做的是将当节点前驱节点指向尾节点(current.prev=tail),尾节点指向它(tail=current),原有的尾节点的后继节点指向它(t.next=current)而这些操作要求是原子的。上面的操作是利用尾节点的设置来保证的,也就是compareAndSetTail来完成的。

2.先行尝试在队尾添加;

如果尾节点已经有了,然后做如下操作:

(1)分配引用T指向尾节点;

(2)将节点的前驱节点更新为尾节点(current.prev=tail);

(3)如果尾节点是T,那么将当尾节点设置为该节点(tail=current,原子更新);

(4)T的后继节点指向当前节点(T.next=current)。

注意第3点是要求原子的。

这样可以以最短路径O(1)的效果来完成线程入队,是最大化减少开销的一种方式。

3.如果队尾添加失败或者是第一个入队的节点。

如果是第1个节点,也就是sync队列没有初始化,那么会进入到enq这个方法,进入的线程可能有多个,或者说在addWaiter中没有成功入队的线程都将进入enq这个方法。

可以看到enq的逻辑是确保进入的Node都会有机会顺序的添加到sync队列中,而加入的步骤如下:

(1)如果尾节点为空,那么原子化的分配一个头节点,并将尾节点指向头节点,这一步是初始化;

(2)然后是重复在addWaiter中做的工作,但是在一个while(true)的循环中,直到当前节点入队为止。

进入sync队列之后,接下来就是要进行锁的获取,或者说是访问控制了,只有一个线程能够在同一时刻继续的运行,而其他的进入等待状态。而每个线程都是一个独立的个体,它们自省的观察,当条件满足的时候(自己的前驱是头结点并且原子性的获取了状态),那么这个线程能够继续运行。

01
final
boolean
acquireQueued(
final
Node
node,
int
arg)
{
02
boolean
failed
=
true
;
03
try
{
04
boolean
interrupted
=
false
;
05
for
(;;)
{
06
final
Node
p=node.predecessor();
07
if
(p
==head&&tryAcquire(arg)){
08
setHead(node);
09
p.next
=
null
;
//
helpGC
10
failed
=
false
;
11
return
interrupted;
12
}
13
if
(shouldParkAfterFailedAcquire(p,
node)&&
14
parkAndCheckInterrupt())
15
interrupted
=
true
;
16
}
17
}
finally
{
18
if
(failed)
19
cancelAcquire(node);
20
}
21
}
上述逻辑主要包括:

1.获取当前节点的前驱节点;

需要获取当前节点的前驱节点,而头结点所对应的含义是当前站有锁且正在运行。

2.当前驱节点是头结点并且能够获取状态,代表该当前节点占有锁;

如果满足上述条件,那么代表能够占有锁,根据节点对锁占有的含义,设置头结点为当前节点。

3.否则进入等待状态。

如果没有轮到当前节点运行,那么将当前线程从线程调度器上摘下,也就是进入等待状态。

这里针对acquire做一下总结:

1.状态的维护;

需要在锁定时,需要维护一个状态(int类型),而对状态的操作是原子和非阻塞的,通过同步器提供的对状态访问的方法对状态进行操纵,并且利用compareAndSet来确保原子性的修改。

2.状态的获取;

一旦成功的修改了状态,当前线程或者说节点,就被设置为头节点。

3.sync队列的维护。

在获取资源未果的过程中条件不符合的情况下(不该自己,前驱节点不是头节点或者没有获取到资源)进入睡眠状态,停止线程调度器对当前节点线程的调度。

这时引入的一个释放的问题,也就是说使睡眠中的Node或者说线程获得通知的关键,就是前驱节点的通知,而这一个过程就是释放,释放会通知它的后继节点从睡眠中返回准备运行。

下面的流程图基本描述了一次acquire所需要经历的过程:



如上图所示,其中的判定退出队列的条件,判定条件是否满足和休眠当前线程就是完成了自旋spin的过程。


publicfinalbooleanrelease(intarg)

在unlock方法的实现中,使用了同步器的release方法。相对于在之前的acquire方法中可以得出调用acquire,保证能够获取到锁(成功获取状态),而release则表示将状态设置回去,也就是将资源释放,或者说将锁释放。

1
public
final
boolean
release(
int
arg)
{
2
if
(tryRelease(arg))
{
3
Node
h=head;
4
if
(h
!=
null
&&
h.waitStatus!=
0
)
5
unparkSuccessor(h);
6
return
true
;
7
}
8
return
false
;
9
}
上述逻辑主要包括:

1.尝试释放状态;

tryRelease能够保证原子化的将状态设置回去,当然需要使用compareAndSet来保证。如果释放状态成功过之后,将会进入后继节点的唤醒过程。

2.唤醒当前节点的后继节点所包含的线程。

通过LockSupport的unpark方法将休眠中的线程唤醒,让其继续acquire状态。

01
private
void
unparkSuccessor(Node
node){
02
//
将状态设置为同步状态
03
int
ws
=node.waitStatus;
04
if
(ws
<
0
)
compareAndSetWaitStatus(node,ws,
0
);
//
获取当前节点的后继节点,如果满足状态,那么进行唤醒操作//如果没有满足状态,从尾部开始找寻符合要求的节点并将其唤醒Nodes=node.next;if(s==null||s.waitStatus>0){
05
s
=
null
;
06
for
(Node
t=tail;t!=
null
&&
t!=node;t=t.prev)
07
if
(t.waitStatus
<=
0
)
08
s
=t;
09
}
10
if
(s
!=
null
)
11
LockSupport.unpark(s.thread);
12
}
上述逻辑主要包括,该方法取出了当前节点的next引用,然后对其线程(Node)进行了唤醒,这时就只有一个或合理个数的线程被唤醒,被唤醒的线程继续进行对资源的获取与争夺。

回顾整个资源的获取和释放过程:

在获取时,维护了一个sync队列,每个节点都是一个线程在进行自旋,而依据就是自己是否是首节点的后继并且能够获取资源;

在释放时,仅仅需要将资源还回去,然后通知一下后继节点并将其唤醒。

这里需要注意,队列的维护(首节点的更换)是依靠消费者(获取时)来完成的,也就是说在满足了自旋退出的条件时的一刻,这个节点就会被设置成为首节点。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: