您的位置:首页 > 编程语言 > Java开发

Java 7之多线程第7篇 - 线程锁基础

2017-03-06 17:39 162 查看
线程锁是用来实现同步机制的,前面讲到过使用synchronized关键字来实现同步。

传送门 - 使用Synchronized关键字实现同步  http://blog.csdn.net/mazhimazh/article/details/16921255

使用这个关键字实现的同步块有一些缺点:

(1)锁只有一种类型

(2)线程得到锁或者阻塞

(3)不能实现很好的并发

为了解决如上的各种问题,后来又提出了一种更为复杂的锁 - 线程锁。线程锁可以在几个方面进行提升:

(1)添加不同类型的锁,如读取锁和写入锁(主要实现类为ReentrantReadWriteLock类)

(2)对锁的阻塞没有限制,即可以在一个方法中上锁,在另外一个方法中解锁。

(3)如果线程得不到锁,比如锁由另外一个线程持有,就允许该线程后退或继续执行,或者做其他事情 - 使用类中提供的tryLock()方法

(4)允许线程尝试取锁,并可以在超过等待时间后放弃。

下面来认识一下一个简单的线程锁 - Lock锁。简单的用法如下:

[java]
view plain
copy

print?





public class Counter{  
    private Lock lock = new ReentrantLock();  
    private int count = 0;  
  
    public int inc(){  
        lock.lock();  
        int newCount = ++count;  
        lock.unlock();  
        return newCount;  
    }  
}  



public class Counter{
private Lock lock = new ReentrantLock();
private int count = 0;

public int inc(){
lock.lock();
int newCount = ++count;
lock.unlock();
return newCount;
}
}


由于++count包括读取、自增、赋值操作,所以为了保证能够原子执行,使用了Lock锁。

Lock锁的功能很强大,不过他的实现复杂了不少。锁的实现源代码主要在Java.util.concurrent.locks包下,先看看锁的框架图,如下所示。



注:如上的粗箭头表示,下面类的实现依赖于上面类的实现。例如,AbstractQueueSynchronizer抽象类的实现依赖于LockSupport和Condition类。

1、Lock接口

Lock接口中定义的方法如下:

[java]
view plain
copy

print?





public interface Lock {  
  
    void lock();        // 获取锁  
    void lockInterruptibly() throws InterruptedException; // 如果当前线程没有被打断,则获取锁  
    boolean tryLock(); // 如果在调用的时候能锁没有被其它线程持有就获取这个锁  
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException; // 在给定的时间内获取到锁  
    void unlock();     // 释放锁  
    Condition newCondition(); // 得到Condition实例  
}  



public interface Lock {

void lock();        // 获取锁
void lockInterruptibly() throws InterruptedException; // 如果当前线程没有被打断,则获取锁
boolean tryLock(); // 如果在调用的时候能锁没有被其它线程持有就获取这个锁
boolean tryLock(long time, TimeUnit unit) throws InterruptedException; // 在给定的时间内获取到锁
void unlock();     // 释放锁
Condition newCondition(); // 得到Condition实例
}


2、AbstractOwnableSynchronizer接口

[java]
view plain
copy

print?





public abstract class AbstractOwnableSynchronizer   implements java.io.Serializable {  
    protected AbstractOwnableSynchronizer() { }  
      
    private transient Thread exclusiveOwnerThread;// 独占锁的持有线程  
      
    // 提供锁的setXxx()和getXxx()方法  
    protected final void setExclusiveOwnerThread(Thread t) {  
        exclusiveOwnerThread = t;  
    }  
    protected final Thread getExclusiveOwnerThread() {  
        return exclusiveOwnerThread;  
    }  
}  



public abstract class AbstractOwnableSynchronizer   implements java.io.Serializable {
protected AbstractOwnableSynchronizer() { }

private transient Thread exclusiveOwnerThread;// 独占锁的持有线程

// 提供锁的setXxx()和getXxx()方法
protected final void setExclusiveOwnerThread(Thread t) {
exclusiveOwnerThread = t;
}
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
}

3、下面来介绍一下几个需要重点理解的概念

1>. AQS -- 指AbstractQueuedSynchronizer类。

    AQS是java中管理“锁”的抽象类,锁的许多公共方法都是在这个类中实现。AQS是独占锁(例如ReentrantLock)和共享锁(例如Semaphore)的公共父类。
2>. 队列
    队列是AQS中“等待锁”的线程队列。在多线程中,为了保护竞争资源不被多个线程同时操作而起来错误,我们常常需要通过锁来保护这些资源。在独占锁中,竞争资源在一个时间点只能被一个线程锁访问;而其它线程则需要等待。队列就是管理这些“等待锁”的线程的队列。

    队列是一个非阻塞的 FIFO 队列。也就是说往里面插入或移除一个节点的时候,在并发条件下不会阻塞,而是通过自旋锁和 CAS 保证节点插入和移除的原子性。
    来看一下队列的Node节点源代码,如下:    

[java]
view plain
copy

print?





static final class Node {  
    static final Node SHARED = new Node();// 共享锁  
    static final Node EXCLUSIVE = null;   // 独占锁  
      
    // 定义waitStatus值  
    static final int CANCELLED =  1;  // 线程已被取消  
    static final int SIGNAL    = -1;  // 当前线程的后继线程需要被unpark(唤醒)  
      
    static final int CONDITION = -2;  // 处在Condition休眠状态的线程在等待Condition唤醒  
    static final int PROPAGATE = -3;  // 其它线程获取到共享锁  
  
    /** 
     * Status field, taking on only the values: 
     *   SIGNAL:     The successor of this node is (or will soon be) 
     *               blocked (via park), so the current node must 
     *               unpark its successor when it releases or 
     *               cancels. To avoid races, acquire methods must 
     *               first indicate they need a signal, 
     *               then retry the atomic acquire, and then, 
     *               on failure, block. 
     *   CANCELLED:  This node is cancelled due to timeout or interrupt. 
     *               Nodes never leave this state. In particular, 
     *               a thread with cancelled node never again blocks. 
     *   CONDITION:  This node is currently on a condition queue. 
     *               It will not be used as a sync queue node 
     *               until transferred, at which time the status 
     *               will be set to 0. (Use of this value here has 
     *               nothing to do with the other uses of the 
     *               field, but simplifies mechanics.) 
     *   PROPAGATE:  A releaseShared should be propagated to other 
     *               nodes. This is set (for head node only) in 
     *               doReleaseShared to ensure propagation 
     *               continues, even if other operations have 
     *               since intervened. 
     *   0:          None of the above 
     * 
     * The values are arranged numerically to simplify use. 
     * Non-negative values mean that a node doesn't need to 
     * signal. So, most code doesn't need to check for particular 
     * values, just for sign. 
     * 
     * The field is initialized to 0 for normal sync nodes, and 
     * CONDITION for condition nodes.  It is modified using CAS 
     * (or when possible, unconditional volatile writes). 
     */  
    volatile int waitStatus;  
  
    volatile Node prev;  
    volatile Node next;  
    volatile Thread thread; // 节点对应的当前线程  
    Node nextWaiter;  
      
    // 共享锁则返回true,独占锁则返回false。  
    final boolean isShared() {  
        return nextWaiter == SHARED;  
    }  
    // 如果有的话,返回前一个节点  
    //predecessor引用的作用是为了支持锁等待超时(timeout)和锁等待回退(cancellation)的功能  
    final Node predecessor() throws NullPointerException {  
        Node p = prev;  
        if (p == null)  
            throw new NullPointerException();  
        else  
            return p;  
    }  
    // --------几个构造函数----------------------------------  
    Node() {  }    
    Node(Thread thread, Node mode) {     // Used by addWaiter  
        this.nextWaiter = mode;  
        this.thread = thread;  
    }  
    Node(Thread thread, int waitStatus) { // Used by Condition  
        this.waitStatus = waitStatus;  
        this.thread = thread;  
    }  
}// end Node  
  
/* CLH是一个非阻塞的 FIFO 队列。也就是说往里面插入或移除一个节点的时候,在并发条件下不会阻塞,而是通过自旋锁和 
 * CAS 保证节点插入和移除的原子性 
 */  
private transient volatile Node head;   
private transient volatile Node tail;  



static final class Node {
static final Node SHARED = new Node();// 共享锁
static final Node EXCLUSIVE = null;   // 独占锁

// 定义waitStatus值
static final int CANCELLED =  1;  // 线程已被取消
static final int SIGNAL    = -1;  // 当前线程的后继线程需要被unpark(唤醒)

static final int CONDITION = -2;  // 处在Condition休眠状态的线程在等待Condition唤醒
static final int PROPAGATE = -3;  // 其它线程获取到共享锁

/**
* Status field, taking on only the values:
*   SIGNAL:     The successor of this node is (or will soon be)
*               blocked (via park), so the current node must
*               unpark its successor when it releases or
*               cancels. To avoid races, acquire methods must
*               first indicate they need a signal,
*               then retry the atomic acquire, and then,
*               on failure, block.
*   CANCELLED:  This node is cancelled due to timeout or interrupt.
*               Nodes never leave this state. In particular,
*               a thread with cancelled node never again blocks.
*   CONDITION:  This node is currently on a condition queue.
*               It will not be used as a sync queue node
*               until transferred, at which time the status
*               will be set to 0. (Use of this value here has
*               nothing to do with the other uses of the
*               field, but simplifies mechanics.)
*   PROPAGATE:  A releaseShared should be propagated to other
*               nodes. This is set (for head node only) in
*               doReleaseShared to ensure propagation
*               continues, even if other operations have
*               since intervened.
*   0:          None of the above
*
* The values are arranged numerically to simplify use.
* Non-negative values mean that a node doesn't need to
* signal. So, most code doesn't need to check for particular
* values, just for sign.
*
* The field is initialized to 0 for normal sync nodes, and
* CONDITION for condition nodes.  It is modified using CAS
* (or when possible, unconditional volatile writes).
*/
volatile int waitStatus;

volatile Node prev;
volatile Node next;
volatile Thread thread; // 节点对应的当前线程
Node nextWaiter;

// 共享锁则返回true,独占锁则返回false。
final boolean isShared() {
return nextWaiter == SHARED;
}
// 如果有的话,返回前一个节点
//predecessor引用的作用是为了支持锁等待超时(timeout)和锁等待回退(cancellation)的功能
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
// --------几个构造函数----------------------------------
Node() {  }
Node(Thread thread, Node mode) {     // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}// end Node

/* CLH是一个非阻塞的 FIFO 队列。也就是说往里面插入或移除一个节点的时候,在并发条件下不会阻塞,而是通过自旋锁和
* CAS 保证节点插入和移除的原子性
*/
private transient volatile Node head;
private transient volatile Node tail;
 举一个操作,向队列末尾插入元素时的操作如下:

[java]
view plain
copy

print?





// 向队列中插入Node节点  
    private Node enq(final Node node) {  
        for (;;) {  
            Node t = tail;  
            if (t == null) { // Must initialize  
                if (compareAndSetHead(new Node()))  
                    tail = head;  
            } else {  
                node.prev = t;  
                if (compareAndSetTail(t, node)) {  
                    t.next = node;  
                    return t;  
                }  
            }  
        }  
    }  



// 向队列中插入Node节点
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}


3>. CAS函数 -- Compare And Swap 

     CAS函数,是比较并交换函数,它是原子操作函数;即,通过CAS操作的数据都是以原子方式进行的。例如,compareAndSetHead(), compareAndSetTail(), compareAndSetNext()等函数。它们共同的特点是,这些函数所执行的动作是以原子的方式进行的。
      如果对他们的原理不怎么理解,可以查看:传送门 - http://blog.csdn.net/mazhimazh/article/details/18908493

转载自http://blog.csdn.net/mazhimazh/
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Java 7之多线程