您的位置:首页 > 编程语言 > Java开发

JAVA并发编程学习笔记之AQS源码分析(超时、中断与其他)

2016-05-21 15:23 543 查看

中断

JAVA中并没有好的中断线程的方式,早期引入的Thead.stop()和Thread.resume()容易导致死锁(参考:http://docs.oracle.com/javase/6/docs/technotes/guides/concurrency/threadPrimitiveDeprecation.html),已经不推荐使用。

JAVA线程提供了协作式中断,何为协作是中断,是相对抢占式中断而言的,简单来讲就是设置一个中断标志位,不停地检查这个标志位的状态,如果检查到线程中断,就中断线程。JVM线程内部维护着一个中断标志,程序员不能直接操作这个中断标志位,只能通过线程的以下几个方法设置中断位:

[html] view
plain copy

 print?

public void interrupt()  

public static boolean interrupted()   

private native boolean isInterrupted(boolean ClearInterrupted);  

public boolean isInterrupted()  

AQS中提供了支持中断的方法

[html] view
plain copy

 print?

private void doAcquireInterruptibly(int arg) throws InterruptedException;  

private void doAcquireSharedInterruptibly(int arg) throws InterruptedException;   

private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException;  

这几个方法都抛出了InterruptedException,这些方法都会先出处中断异常,处理的代码如下:

[html] view
plain copy

 print?

if (Thread.interrupted())  

    throw new InterruptedException();  

我们还看到有些方法并没有申请抛出InterruptedException,当它被中断时,设置了线程的中断位。

[html] view
plain copy

 print?

private static void selfInterrupt() {  

    Thread.currentThread().interrupt();  

}  

超时

AQS与JVM内置锁的一个不同点在于AQS中提供了超时机制,即线程在等待一定时间后会立即返回。下面以doAcquireNanos为例来分析:

[java] view
plain copy

 print?

private boolean doAcquireNanos(int arg, long nanosTimeout)  

    throws InterruptedException {  

    long lastTime = System.nanoTime();  

    final Node node = addWaiter(Node.EXCLUSIVE);  

    boolean failed = true;  

    try {  

        for (;;) {  

            final Node p = node.predecessor();  

            if (p == head && tryAcquire(arg)) {  

                setHead(node);  

                p.next = null; // help GC  

                failed = false;  

                return true;  

            }  

            if (nanosTimeout <= 0)  

                return false;  

            if (shouldParkAfterFailedAcquire(p, node) &&  

                nanosTimeout > spinForTimeoutThreshold)  

                LockSupport.parkNanos(this, nanosTimeout);  

            long now = System.nanoTime();  

            nanosTimeout -= now - lastTime;  

            lastTime = now;  

            if (Thread.interrupted())  

                throw new InterruptedException();  

        }  

    } finally {  

        if (failed)  

            cancelAcquire(node);  

    }  

}  

1、首先取得当前系统时间,在循环等待的过程中,如果剩余时间<=0立即返回;
2、如果剩余时间>0,就用总时间减去一次循环耗费的时间,继续阻塞;
3、如果在这期间线程被中断,就抛出中断异常,如果有其他异常产生,就取消这次获取。

取消

取消获取的逻辑比较复杂,下面来分析一下:

[java] view
plain copy

 print?

private void cancelAcquire(Node node) {  

      // Ignore if node doesn't exist  

      if (node == null)  

          return;  

  

      node.thread = null;  

  

      // Skip cancelled predecessors  

      Node pred = node.prev;  

      while (pred.waitStatus > 0)  

          node.prev = pred = pred.prev;  

  

      // predNext is the apparent node to unsplice. CASes below will  

      // fail if not, in which case, we lost race vs another cancel  

      // or signal, so no further action is necessary.  

      Node predNext = pred.next;  

  

      // Can use unconditional write instead of CAS here.  

      // After this atomic step, other Nodes can skip past us.  

      // Before, we are free of interference from other threads.  

      node.waitStatus = Node.CANCELLED;  

  

      // If we are the tail, remove ourselves.  

      if (node == tail && compareAndSetTail(node, pred)) {  

          compareAndSetNext(pred, predNext, null);  

      } else {  

          // If successor needs signal, try to set pred's next-link  

          // so it will get one. Otherwise wake it up to propagate.  

          int ws;  

          if (pred != head &&  

              ((ws = pred.waitStatus) == Node.SIGNAL ||  

               (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&  

              pred.thread != null) {  

              Node next = node.next;  

              if (next != null && next.waitStatus <= 0)  

                  compareAndSetNext(pred, predNext, next);  

          } else {  

              unparkSuccessor(node);  

          }  

  

          node.next = node; // help GC  

      }  

  }  

1、首先取得当前结点的前趋结点,如果前趋结点也被取消直接跳过,继续向前找非取消的结点;
2、将当前结点设置为取消状态;
3、如果当前结点是队尾结点,则将当前结点从队尾移除;否则执行4;
4、找到当前结点的继任结点,前趋的next指针指向继任结点(pred->next=current->next);

5、当前结点的next指针指向自己,前面提到这一方面为了回收,一方面为了使isOnSyncQueue方法简单。

其他

AQS还提供了一些线程监控的方法:

[java] view
plain copy

 print?

//获取哪些线程在等待  

protected final Collection<Thread> getWaitingThreads();   

//获取等待队列的长度  

protected final int getWaitQueueLength();   

//是否有线程在等待  

protected final boolean hasWaiters()  

//是否拥有同步器  

final boolean isOwnedBy(AbstractQueuedSynchronizer sync)  

//是否在同步队列中  

final boolean isOnSyncQueue(Node node)  

//支持共享模式的线程  

public final Collection<Thread> getSharedQueuedThreads()  

//支持独占模式的线程  

public final Collection<Thread> getExclusiveQueuedThreads();  

参考资料:

Java线程中断的本质和编程原则
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: