您的位置:首页 > 编程语言 > Java开发

java 同步 Synchonized 锁 ReentrantLock 原理 源码

2015-07-22 17:37 801 查看


synchronized:Java语言的关键字,当它用来修饰一个方法或者一个代码块的时候,能够保证在同一时刻最多只有一个线程执行该段代码。


我们知道java中Synchonized 分三种情况,1.放在方法中。2.放在静态方法中。3.同步代码块。先来知道下三者的区别:

对于同步方法,锁是当前实例对象。(如果此对象同步方法执行中,那所有此对象的其他同步方法都阻塞)

对于静态同步方法,锁是当前对象的Class对象。(如果此静态同步方法执行中,此类所有其他静态同步方法阻塞---哪怕是不同实例)

对于同步方法块,锁是Synchonized括号里配置的对象。

当一个线程试图访问同步代码块时,它首先必须得到锁,退出或抛出异常时必须释放锁。那么锁存在哪里呢?锁里面会存储什么信息呢?


同步的原理

JVM规范规定JVM基于进入和退出Monitor对象来实现方法同步和代码块同步,但两者的实现细节不一样。代码块同步是使用monitorenter和monitorexit指令实现,而方法同步是使用另外一种方式实现的,细节在JVM规范里并没有详细说明,但是方法的同步同样可以使用这两个指令来实现。monitorenter指令是在编译后插入到同步代码块的开始位置,而monitorexit是插入到方法结束处和异常处, JVM要保证每个monitorenter必须有对应的monitorexit与之配对。任何对象都有一个
monitor 与之关联,当且一个monitor 被持有后,它将处于锁定状态。线程执行到 monitorenter 指令时,将会尝试获取对象所对应的 monitor 的所有权,即尝试获得对象的锁。

--------------------------------------------ReentrantLock-----------------------------------------------------------

ReentrantLock是JDK1.5引入的,它拥有与synchronized相同的并发性和内存语义,并提供了超出synchonized的其他高级功能(例如,中断锁等候、条件变量等)

ReentrantLock的实现基于AQS(AbstractQueuedSynchronizer)和LockSupport。

AQS主要利用CAS (compare-and-swap),来实现轻量级多线程同步机制,并且不会引起CPU上下文切换和调度,同时提供内存可见性和原子化更新保证(线程安全的三要素:原子性、可见性、顺序性)。

AQS的本质上是一个同步器/阻塞锁的基础框架,其作用主要是提供加锁、释放锁,并在内部维护一个FIFO等待队列,用于存储由于锁竞争而阻塞的线程。


关键代码分析

1.关键字段

AQS使用链表作为队列,使用volatile变量state,作为锁状态标识位。

/**

* Head of the wait queue, lazily initialized. Except for

* initialization, it is modified only via method setHead. Note:

* If head exists, its waitStatus is guaranteed not to be

* CANCELLED.

*/

private transient volatile Node head; //等待队列的头

/**

* Tail of the wait queue, lazily initialized. Modified only via

* method enq to add new wait node.

*/

private transient volatile Node tail; //等待队列的尾

/**

* The synchronization state.

*/

private volatile int state; //原子性的锁状态位,ReentrantLock对该字段的调用是通过原子操作compareAndSetState进行的

protected final boolean compareAndSetState(int expect, int update) {

// See below for intrinsics setup to support this

return unsafe.compareAndSwapInt(this, stateOffset, expect, update);

}

2.ReentrantLock的公平锁与非公平锁

从ReentrantLock的构造子可以看到,ReentrantLock提供两种锁:公平锁和非公平锁,其内部实现了两种同步器NonfairSync、FairSync派生自AQS,主要才采用了模板方法模式,主要重写了AQS的tryAcquire、lock方法,如下图。



3.获取锁操作

public void lock() {

sync.lock();

}

由于NonfairSync、FairSync分别实现了lock方法,我们将分别探讨

3.1NonfairSync.lock()分析

(1)通过原子的比较并设置操作,如果成功设置,说明锁是空闲的,当前线程获得锁,并把当前线程设置为锁拥有者;

(2)否则,调用acquire方法;

package java.util.concurrent.locks.ReentrantLock;

final void lock() {

if (compareAndSetState(0, 1))//表示如果当前state=0,那么设置state=1,并返回true;否则返回false。由于未等待,所以线程不需加入到等待队列

setExclusiveOwnerThread(Thread.currentThread());

else

acquire(1);

}

package java.util.concurrent.locks.AbstractOwnableSynchronizer //AbstractOwnableSynchronizer是AQS的父类

protected final void setExclusiveOwnerThread(Thread t) {

exclusiveOwnerThread = t;

}

3.1.1acquire方法分析

(1)如果尝试以独占的方式获得锁失败,那么就把当前线程封装为一个Node,加入到等待队列中;如果加入队列成功,接下来检查当前线程的节点是否应该等待(挂起),如果当前线程所处节点的前一节点的等待状态小于0,则通过LockSupport挂起当前线程;无论线程是否被挂起,或者挂起后被激活,都应该返回当前线程的中断状态,如果处于中断状态,需要中断当前线程。

package java.util.concurrent.locks.AbstractQueuedSynchronizer

public final void acquire(int arg) {

if (!tryAcquire(arg) &&

acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

selfInterrupt();

}

protected final boolean tryAcquire(int acquires) {

return nonfairTryAcquire(acquires);

}

3.1.2nonfairTryAcquire分析

(1)如果锁状态空闲(state=0),且通过原子的比较并设置操作,那么当前线程获得锁,并把当前线程设置为锁拥有者;

(2)如果锁状态空闲,且原子的比较并设置操作失败,那么返回false,说明尝试获得锁失败;

(3)否则,检查当前线程与锁拥有者线程是否相等(表示一个线程已经获得该锁,再次要求该锁,这种情况叫可重入锁),如果相等,维护锁状态,并返回true;

(4)如果不是以上情况,说明锁已经被其他的线程持有,直接返回false;

final boolean nonfairTryAcquire(int acquires) {

final Thread current = Thread.currentThread();

int c = getState();

if (c == 0) {

if (compareAndSetState(0, acquires)) {

setExclusiveOwnerThread(current);

return true;

}

}

else if (current == getExclusiveOwnerThread()) { //表示一个线程已经获得该锁,再次要求该锁(重入锁的由来),为状态位加acquires

int nextc = c + acquires;

if (nextc < 0) // overflow

throw new Error("Maximum lock count exceeded");

setState(nextc);

return true;

}

return false;

}

3.1.3addWaiter分析

(1)如果tail节点不为null,说明队列不为空,则把新节点加入到tail的后面,返回当前节点,否则进入enq进行处理(2);

(2)如果tail节点为null,说明队列为空,需要建立一个虚拟的头节点,并把封装了当前线程的节点设置为尾节点;另外一种情况的发生,是由于在(1)中的compareAndSetTail可能会出现失败,这里采用for的无限循环,是要保证当前线程能够正确进入等待队列;

acquireQueued分析package java.util.concurrent.locks.AbstractQueuedSynchronizer

private Node addWaiter(Node mode) {

Node node = new Node(Thread.currentThread(), mode);

// Try the fast path of enq; backup to full enq on failure

Node pred = tail;

if (pred != null) { //如果当前队列不是空队列,则把新节点加入到tail的后面,返回当前节点,否则进入enq进行处理。

node.prev = pred;

if (compareAndSetTail(pred, node)) {

pred.next = node;

return node;

}

}

enq(node);

return node;

}

package java.util.concurrent.locks.AbstractQueuedSynchronizer

private Node enq(final Node node) {

for (;;) {

Node t = tail;

if (t == null) { // tail节点为空,说明是空队列,初始化头节点,如果成功,返回头节点

Node h = new Node(); // Dummy header

h.next = node;

node.prev = h;

if (compareAndSetHead(h)) {

tail = node;

return h;

}

}

else { //

node.prev = t;

if (compareAndSetTail(t, node)) {

t.next = node;

return t;

}

}

}

3.1.4
acquireQueued分析

(1)如果当前节点是队列的头结点(如果第一个节点是虚拟节点,那么第二个节点实际上就是头结点了),就尝试在此获取锁tryAcquire(arg)。如果成功就将头结点设置为当前节点(不管第一个结点是否是虚拟节点),返回中断状态。否则进行(2)。

(2)检测当前节点是否应该park()-"挂起的意思",如果应该park()就挂起当前线程并且返回当前线程中断状态。进行操作(1)。

final boolean acquireQueued(final Node node, int arg) {

try {

boolean interrupted = false;

for (;;) {

final Node p = node.predecessor();

if (p == head && tryAcquire(arg)) {

setHead(node);

p.next = null; // help GC

return interrupted;

}

if (shouldParkAfterFailedAcquire(p, node) &&

parkAndCheckInterrupt())

interrupted = true;

}

} catch (RuntimeException ex) {

cancelAcquire(node);

throw ex;

}

}

3.1.5 shouldParkAfterFailedAcquire分析

(1)如果前一个节点的等待状态waitStatus<0,也就是前面的节点还没有获得到锁,那么返回true,表示当前节点(线程)就应该park()了。否则进行(2)。

(2)如果前一个节点的等待状态waitStatus>0,也就是前一个节点被CANCELLED了,那么就将前一个节点去掉,递归此操作直到所有前一个节点的waitStatus<=0,进行(4)。否则进行(3)。

(3)前一个节点等待状态waitStatus=0,修改前一个节点状态位为SINGAL,表示后面有节点等待你处理,需要根据它的等待状态来决定是否该park()。进行(4)。

(4)返回false,表示线程不应该park()。

注意:一个Node节点可包含以下状态以及模式:

/** waitStatus value to indicate thread has cancelled */ 取消

static final int CANCELLED = 1;

/** waitStatus value to indicate successor's thread needs unparking */ 信号等待(在AQS中,是通过LockSupport进行线程间信号交互的)

static final int SIGNAL = -1;

/** waitStatus value to indicate thread is waiting on condition */ 条件等待

static final int CONDITION = -2;

/** Marker to indicate a node is waiting in shared mode */ 共享模式

static final Node SHARED = new Node();

/** Marker to indicate a node is waiting in exclusive mode */ 独占模式

static final Node EXCLUSIVE = null;

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {

int s = pred.waitStatus;

if (s < 0)

/*

* This node has already set status asking a release

* to signal it, so it can safely park

*/

return true;

if (s > 0) {

/*

* Predecessor was cancelled. Skip over predecessors and

* indicate retry.

*/

do {

node.prev = pred = pred.prev;

} while (pred.waitStatus > 0);

pred.next = node;

}

else

/*

* Indicate that we need a signal, but don't park yet. Caller

* will need to retry to make sure it cannot acquire before

* parking.

*/

compareAndSetWaitStatus(pred, 0, Node.SIGNAL);

return false;

}

private final boolean parkAndCheckInterrupt() {

LockSupport.park(this); //阻塞,即挂起;在没有unpark之前,下面的代码将不会执行;

return Thread.interrupted();//个人感觉,如果没有外部的interrupt或者超时等,这里将始终返回false;

}

private static void selfInterrupt() {

Thread.currentThread().interrupt();

}

3.2FairSync.lock()分析

公平锁相对与非公平锁,在锁的获取实现上,差别只在FairSync提供自己的tryAcquire()的方法实现,状态0空闲入队前还多了个判断hasQueuedPredecessors;判断是否有其他线程也在等待获取锁,并且比当前线程等待的时间长。

protected final boolean tryAcquire(int acquires) {

final Thread current = Thread.currentThread();

int c = getState();

if (c == 0) {

if (!hasQueuedPredecessors() &&

compareAndSetState(0, acquires)) {

setExclusiveOwnerThread(current);

return true;

}

}

else if (current == getExclusiveOwnerThread()) {

int nextc = c + acquires;

if (nextc < 0)

throw new Error("Maximum lock count exceeded");

setState(nextc);

return true;

}

return false;

}

protected final boolean tryAcquire(int acquires) {

final Thread current = Thread.currentThread();

int c = getState();

if (c == 0) {

if (!hasQueuedPredecessors() &&

compareAndSetState(0, acquires)) {

setExclusiveOwnerThread(current);

return true;

}

}

else if (current == getExclusiveOwnerThread()) {

int nextc = c + acquires;

if (nextc < 0)

throw new Error("Maximum lock count exceeded");

setState(nextc);

return true;

}

return false;

}

4.总结

ReentrantLock在采用非公平锁构造时,首先检查锁状态,如果锁可用,直接通过CAS设置成持有状态,且把当前线程设置为锁的拥有者。

如果当前锁已经被持有,那么接下来进行可重入检查,如果可重入,需要为锁状态加上请求数。如果不属于上面两种情况,那么说明锁是被其他线程持有,

当前线程应该放入等待队列。

在放入等待队列的过程中,首先要检查队列是否为空队列,如果为空队列,需要创建虚拟的头节点,然后把对当前线程封装的节点加入到队列尾部。由于设置尾部节点采用了CAS,为了保证尾节点能够设置成功,这里采用了无限循环的方式,直到设置成功为止。

在完成放入等待队列任务后,则需要维护节点的状态,以及及时清除处于Cancel状态的节点,以帮助垃圾收集器及时回收。如果当前节点之前的节点的等待状态小于1,说明当前节点之前的线程处于等待状态(挂起),那么当前节点的线程也应处于等待状态(挂起)。挂起的工作是由LockSupport类支持的,LockSupport通过JNI调用本地操作系统来完成挂起的任务(java中除了废弃的suspend等方法,没有其他的挂起操作)。

在当前等待的线程,被唤起后,检查中断状态,如果处于中断状态,那么需要中断当前线程。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: