您的位置:首页 > 编程语言

并发编程 08—— 任务取消 之 中断

2014-09-26 11:14 183 查看
Java并发编程实践 目录

并发编程 01—— ThreadLocal

并发编程 02—— ConcurrentHashMap

并发编程 03—— 阻塞队列和生产者-消费者模式

并发编程 04—— 闭锁CountDownLatch 与 栅栏CyclicBarrier

并发编程 05—— Callable和Future

并发编程 06—— CompletionService : Executor 和 BlockingQueue

并发编程 07—— 任务取消

并发编程 08—— 任务取消 之 中断

并发编程 09—— 任务取消 之 停止基于线程的服务

并发编程 10—— 任务取消 之 关闭 ExecutorService

并发编程 11—— 任务取消 之 “毒丸”对象

并发编程 12—— 任务取消与关闭 之 shutdownNow 的局限性

并发编程 13—— 线程池的使用 之 配置ThreadPoolExecutor 和 饱和策略

并发编程 14—— 线程池 之 整体架构

并发编程 15—— 线程池 之 原理一

并发编程 16—— 线程池 之 原理二

并发编程 17—— Lock

并发编程 18—— 使用内置条件队列实现简单的有界缓存

并发编程 19—— 显式的Conditon 对象

并发编程 20—— AbstractQueuedSynchronizer 深入分析

并发编程 21—— 原子变量和非阻塞同步机制

概述

第1部分 问题引入

第2部分 中断

2.1 中断

2.2 中断策略

2.3 响应中断

2.4 实例——计时任务的取消

2.5 通过 Future 来实现取消

2.6 处理不可中断的阻塞

2.7 采用 newTaskFor 来封装非标准的取消

参考

第1部分 问题引入

  上一篇 并发编程—— 任务取消 中,PrimeGenerator 的取消机制最终会使得搜索素数的任务退出,但在退出过程中需要花费一定的时间。如果使用这种方法的任务调用了一个阻塞方法,例如BlockingQueue.put,那么可能会产生一个更严重的问题——任务可能永远不会检查取消标志,因此永远不会结束。

在下面的程序中,BrokenPrimeProducer 就说明了这个问题。生产者线程生产素数,并将它们放入一个阻塞队列。如果生产者的速度超过了消费者的处理速度,队列将被填满,put 方法也会阻塞。当生产者在put 方法中阻塞时,如果消费者希望取消生产者任务,那么将发生什么情况呢?它可以调用cancel 方法来设置cancelled标志,但此时生产者却永远不能检查这个标志,因为它无法从阻塞的put 方法中恢复过来(因为消费者此时已经停止从队列中取出素数,所以put方法将一直保持阻塞状态)。

/**
* 7.3 不可靠的取消操作将把生产者置于阻塞的操作中
* @ClassName: BrokenPrimeProducer
* TODO
* @author Xingle
* @date 2014-9-30 上午9:55:56
*/
public class BrokenPrimeProducer extends Thread{

private final BlockingQueue<BigInteger> queue;
private volatile boolean cancelled = false;

public BrokenPrimeProducer(BlockingQueue<BigInteger> queue){
this.queue = queue;
}

public void run(){
BigInteger p = BigInteger.ONE;
while(!cancelled){
try {
queue.put(p= p.nextProbablePrime());
System.out.println(Thread.currentThread().getName()+"生产数字:"+p);
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName()+"线程中断");
}
}
}

public void cancel(){
this.cancelled = true;
}
}


以上的测试程序:

public class BrokenPrimeProducer_Main {

public static void main(String[] args){
BlockingQueue<BigInteger> queue = new ArrayBlockingQueue<>(3);
BrokenPrimeProducer producer = new BrokenPrimeProducer(queue);
producer.start();
while(true){
try {
System.out.println(Thread.currentThread().getName()
+"消费数据"+queue.take());// 从队列取出一个数
TimeUnit.SECONDS.sleep(1);// 停止1s,显示出消费速度慢于生产速度 
producer.cancel();// 消费者请求停止生产 

} catch (InterruptedException e) {
System.out.println("被中断了");
}
}
}
}


执行结果:

线程将不会停止,而是一直阻塞到这个地方



第2部分 中断

2.1 中断

下面是一个改进的例子,用中断来进行线程的停止。

线程中断是一种协作机制,线程可以通过这种机制来通知另一个线程,告诉它在合适的或者可能的情况下停止当前工作,并转而执行其他的工作。

  在java 的API 或语言规范中,并没有将中断与任何取消语义关联起来,但实际上,如果在取消之外的其他操作中使用中断,那么都是不合适的,并且很难支撑起更大的应用。

每个线程都有一个Boolean 类型的中断状态。当中断线程时,这个线程的中断状态将被设置为true。在Thread 中包含了中断线程以及查询线程中断状态的方法。interrupt方法能中断目标线程,而isInterrupt方法能返回目标线程的中断状态。静态的intertupt 方法将清除当前线程的中断状态,并返回它之前的值,这也是清除中断状态的唯一方法。

public void interrupt()
中断线程。
public boolean isInterrupted()
测试线程是否已经中断。
public static boolean interrupted()
测试当前线程是否已经中断。


调用interrupt并不意味着立即停止目标线程正在进行的工作,而只是传递了请求中断的消息。

  对中断的正确理解是:它并不会真正地中断一个正在运行的线程,而只是发出中断请求,然后由线程在下一个合适的时刻中断自己。(这些时刻也被称为取消点)。在使用静态的interruptd时应该小心,因为它会清除当前线程的中断状态。如果在调用interruptd时返回了true,那么除非你想屏蔽这个中断,否则必须对它进行处理——可以抛出 InterruptedException,或者通过再次调用 interrupt 来恢复中断状态。

通常,中断是实现取消的最合理方式。

BrokenPrimeProducer 中的问题很容易解决:使用中断而不是boolean标志累请求取消,如下面的程序所示。

/**
* 7.5 通过中断来取消
*
* @ClassName: PrimeProducer
* @author Xingle
* @date 2014-9-26 下午6:48:22
*/
public class PrimeProducer extends Thread {

private final BlockingQueue<BigInteger> queue;

PrimeProducer(BlockingQueue<BigInteger> queue) {
this.queue = queue;
}

public void run() {
try {
BigInteger p = BigInteger.ONE;
while (!Thread.currentThread().interrupted()) {
queue.put(p = p.nextProbablePrime());
System.out.println(Thread.currentThread().getName() + " 生产数字 " + p);
}
} catch (InterruptedException e) {
/*允许线程退出*/
System.out.println(Thread.currentThread().getName() + " 线程中断了");
System.out.println(Thread.currentThread().isInterrupted());
}
}

public void cancel() {
interrupt();
}
}


测试程序:

public class PrimeGeneratorMain {

public static void main(String[] args){
PrimeGenerator generator = new PrimeGenerator();
new Thread(generator).start();
try {
Thread.sleep(100);
Thread.currentThread().interrupt();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
generator.cancel();
}
List<BigInteger> ls = generator.get();
for(int i= 0;i<ls.size();i++){
System.out.println(ls.get(i));
}
}
}


执行结果:



2.2 中断策略

  中断策略规定线程如何解释某个中断请求——当发现中断请求时,应该做哪些工作(如果需要的话),哪些工作单元对于中断来说是原子操作,以及以多快的速度来响应中断。

  最合理的中断策略是某种形式的线程级取消操作或服务级取消操作;尽快退出,在必要时进行清理,通知某个所有者该线程已经退出。

  任务不会在其自己拥有的线程中执行,而是在某个服务(例如线程池)拥有的线程中执行。对于非线程所有者的代码来说(例如,对于线程池而言,任何在线程池实现以外的代码),应该小心地保存中断状态,这样拥有线程的代码才能对中断做出响应,即使“非所有者”代码也可以做出响应。

  这就是为什么大多数可阻塞的库函数都只是抛出 InterruptedException作出中断响应。它们永远不会在某个自己拥有的线程中运行,因此它们为任务或库代码实现了最合理的取消策略:尽快退出流程,并把中断信息传递给调用者,从而使调用栈中的上层代码可以采取进一步的操作。

  任务不应该对执行该任务的线程的中断策略做出任何假设,除非该任务被专门设计为在服务中运行,并且在这些服务中包含特定的中断策略。无论任务把中断视为取消,还是其他某个中断响应操作,都应该小心地保存执行线程的中断状态。如果除了将 InterruptedException 传递给调用者外还需要执行其他操作,那么应该在捕获 InterruptedException 之后恢复中断状态:

Thread.currentThread().interrupt();


  正如任务代码不应该对其执行所在的线程的中断策略做出假设,执行取消操作的代码也不应该对线程的中断策略做出假设。线程应该只能由其所有者中断,所有者可以将线程的中断策略信心封装到某个合适的取消机制中,例如关闭(shutdown)方法。

2.3 响应中断

有两种实用策略可用于处理 InterruptedException:

传递异常(可能在执行某个特定于任务的清除操作之后),从而使你的方法也成为可中断的阻塞方法。

恢复中断状态,从而使调用栈中的上层代码能够对其进行处理。

传递 InterruptedException 与将 InterruptedException 添加到throws 子句中一样容易,如下所示:

//将 InterruptedException 传递给调用者
BlockingQueue<Task> queue;
public Task getNextTask() throws InterruptedException{
return queue.take();
}


  如果不想或无法传递 InterruptedException (或许通过Runnable来定义任务),那么需要寻找另一种方式来保存中断请求。一种标准的方法就是通过再次调用 interrupt 来恢复中断状态。

只有实现了线程中断策略的代码才可以屏蔽中断请求,在常规的任务和库代码中都不应该屏蔽中断请求。

对于一些不支持取消但仍可以调用可中断阻塞方法的操作,它们必须在循环中调用这些方法,并在发现中断后重新尝试。在这种情况下,它们应该在本地保存中断状态,并在返回前恢复状态而不是捕获 InterruptException 时恢复状态,如下所示:

/**
* 不可取消的任务在退出前恢复中断
*
*/
public class NoncancelableTask {
public Task getNextTask(BlockingQueue<Task> queue) {
boolean interrupted = false;
try {
while (true) {
try {
return queue.take();
} catch (InterruptedException e) {
interrupted = true;
// 重新尝试
}
}
} finally {
if (interrupted)
Thread.currentThread().interrupt();
}
}

interface Task {
}
}


2.4 实例——计时任务的取消

  在上一节并发编程—— 任务取消 中 PrimeGeneratorMain 方法将启动一个 PrimeGenerator ,并在100ms 后中断。尽管PrimeGenerator 可能需要超过100ms 的时间才能停止,但它最终会发现中断,然后停止,并使线程结束。在执行任务时的另一个方面是,你希望知道在任务执行过程中是否会抛出异常,如果 PrimeGenerator 在指定时限内抛出了一个未检查的异常,那么这个异常可能会被忽略,因为素数生成器在另一个独立的线程中运行,而这个线程并不会显示地处理异常。

  在下面的程序中,给出了在指定时间内运行一个任意的Runnable 的示例。它在调用线程中运行任务,并安排了一个取消任务,在运行指定的时间间隔后中断它。这解决了从任务中抛出未检查异常的问题,因为该异常会被 timedRun 的调用者捕获。

/**
* 7.8 在外部线程中安排中断(不要这样做)
* @ClassName: TimedRun1
* @author Administrator
* @date 2014-10-20 下午2:50:29
*/
public class TimedRun1 {

private static final ScheduledExecutorService cancelExec = Executors.newScheduledThreadPool(1);

public static void timedRun(Runnable r,long timeout, TimeUnit unit) {
final Thread taskThread = Thread.currentThread();
cancelExec.schedule(new Runnable() {
public void run() {
taskThread.interrupt();
System.out.println("1--"+taskThread.isInterrupted());
}
}, timeout, unit);
r.run();
System.out.println("2--"+taskThread.isInterrupted());
}
}


  这是一种非常简单的方法,但却破坏了以下规则:在中断线程之前,应该了解它的中断策略。由于 timedRun 可以从任意一个线程中调用,因此它无法知道这个调用线程的中断策略。如果任务在超时之前完成,那么中断timedRun 所在线程的取消任务将在 timedRun 返回到调用者之后启动。我们不知道在这种情况下将运行什么代码,但结果一定是不好的。

测试程序:

public class TimedRun_Main {
public static void main(String[] args) {
TimedRun1 timeRun1 = new TimedRun1();
Runnable run = new Runnable() {

@Override
public void run() {
int i = 0;
for (int j = 0; j < 100000000; j++) {
i++;
if (i % 10000000 == 0) {
System.out.println(i + "  "+ Thread.currentThread().getName());
}
}
}
};
timeRun1.timedRun(run, 1, TimeUnit.MILLISECONDS);
}
}


执行结果:



  而且,如果任务不响应中断,那么 timedRun 会在任务结束时才返回,此时可能已经超过了指定的时限(或者还没有超过时限)。如果某个限时运行的服务没有在指定的时间内返回,那么将对调用者带来负面影响。

  在下面的程序中解决了最开始的异常处理问题以及上面解决方案中的问题。执行任务的线程拥有自己的执行策略,即使任务不响应中断,限时运行的方法仍能返回到它的调用者。在启动任务线程之后,timedRun 将执行一个限时的 join 方法。在join返回后,它将检查任务中是否有异常抛出,如果有的话,则会在调用timedRun 的线程中再次抛出该异常。由于 Throwable 将在两个线程之间共享,因此该变量被声明为 volatile类型,从而确保安全地将其从任务线程发布到timedRun线程。

/**
* 7.9 在专门的线程中中断任务
* @ClassName: TimedRun2
* @author Administrator
* @date 2014-10-17 下午7:41:19
*/
public class TimedRun2 {
private static final ScheduledExecutorService cancelExec = Executors
.newScheduledThreadPool(1);

public static void timedRun(final Runnable r, long timeout, TimeUnit unit)
throws InterruptedException {
class RethrowableTask implements Runnable {
private volatile Throwable t;

public void run() {
try {
r.run();
} catch (Throwable t) {
this.t = t;
}
}

void rethrow() {
if (t != null)
try {
throw launderThrowable(t);
} catch (Exception e) {
e.printStackTrace();
}
}
}

RethrowableTask task = new RethrowableTask();
final Thread taskThread = new Thread(task);
taskThread.start();
cancelExec.schedule(new Runnable() {
public void run() {
taskThread.interrupt();
System.out.println("1--" + taskThread.isInterrupted());
}
}, timeout, unit);
taskThread.join(unit.toMillis(timeout));
task.rethrow();
System.out.println("2--" + taskThread.isInterrupted());
}

public static Exception launderThrowable(Throwable t) {
if (t instanceof RuntimeException)
return (RuntimeException) t;
else if (t instanceof Error)
throw (Error) t;
else
throw new IllegalStateException("Not unchecked", t);
}
}


执行同样的程序:

public class TimedRun_Main {
public static void main(String[] args) {
TimedRun2 timeRun = new TimedRun2();
Runnable run = new Runnable() {

@Override
public void run() {
int i = 0;
for (int j = 0; j < 100000000; j++) {
i++;
if (i % 10000000 == 0) {
System.out.println(i + "  "+ Thread.currentThread().getName());
}
}
}
};
try {
timeRun.timedRun(run, 1, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}


结果:



  在这个示例的代码中解决了前面示例中的问题,但由于它依赖一个限时的 join ,因此存在着join的不足,无法知道执行控制是因为线程正常退出而返回还是因为 join 超时而返回。

2.5 通过 Future 来实现取消

前面的例子都是直接使用runnable来执行本身,所以如果要取消任务的话只能使用wait join sleep与Interrupt来组合取消任务。

  其实 Future 早已经提供这样的功能 ,ExecutorService.submit 将返回一个 Future 来描述任务。Future 拥有一个cancel 方法,该方法带有一个 boolean 类型的参数 mayinterruptIfRunning,表示取消操作是否成功。(这只是表示任务是否能接受中断,而不是表示任务是否能检测并处理中断。)如果 mayinterruptIfRunning 为 true 并且任务当前正在某个线程中运行,那么这个线程能被中断。如果这个参数为 false,那么意味着“若任务还没有启动,就不要运行它”,这种方式应该用于那些不处理中断的任务中。

下面程序给出了另一个版本的 timedRun:将任务提交给一个 ExecutorService ,并通过一个定时的 Future.get 来获取结果。如果 get 在返回时抛出一个 TimeoutException,那么任务将通过它的 Future 来取消。如果任务在被取消前就抛出一个异常,那么该异常将被重新抛出以便由调用者来处理异常。

/**
* 7.10 通过Future 来取消任务
*
*/
public class TimedRun {
private static final ExecutorService taskExec = Executors.newCachedThreadPool();

public static void timedRun(Runnable r,
long timeout, TimeUnit unit)
throws InterruptedException {
Future<?> task = taskExec.submit(r);
try {
task.get(timeout, unit);
} catch (TimeoutException e) {
// task will be cancelled below
} catch (ExecutionException e) {
// exception thrown in task; rethrow
throw launderThrowable(e.getCause());
} finally {
// Harmless if task already completed
task.cancel(true); // interrupt if running
}
}
}


当Future.get 抛出InterruptedException 或 TimeoutException 时,如果你知道不再需要结果,那么就可以调用 Future.cancel 来取消。

实例:

/**
*
* @ClassName: Task
* TODO
* @author xingle
* @date 2014-10-22 下午12:10:49
*/
public class Task  implements Callable<String>{

//创建Task类,指定实现Callable接口,并参数化为String类型。
//实现call()方法,写入一条信息到控制台,并使这个线程在循环中睡眠100毫秒。
@Override
public String call() throws Exception {
while (true) {
System.out.println("我在执行任务: Test 来自"+Thread.currentThread().getName()+"\n");
Thread.sleep(100);
}
}
}


测试程序:

/**
* 通过 Future 来取消任务
* @ClassName: Task_Main
* TODO
* @author xingle
* @date 2014-10-22 下午12:11:53
*/
public class Task_Main {
public static final ScheduledExecutorService executor = Executors
.newScheduledThreadPool(1);

public static void main(String[] args) {
Task task = new Task();
System.out.printf("Main: 开始\n");
Future<String> future = executor.submit(task);
try {
future.get(300, TimeUnit.MILLISECONDS);//设置超时执行时间
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
//如果在任务中抛出了异常,那么重新抛出该异常
throw launderThrowable(e.getCause());
} catch (TimeoutException e) {
e.printStackTrace();
//接下来任务将被取消
} finally {
System.out.printf("执行取消任务 \n");
future.cancel(true);//如果任务正在运行,那么将被中断
}

//将isCancelled()方法和isDone()的调用结果写入控制台,验证任务已取消,因此,已完成。
System.out.printf("Canceled: "+ future.isCancelled()+"\n");
System.out.printf("Done: "+ future.isDone()+"\n");
//
executor.shutdown();
System.out.printf("The executor has finished\n");

}
public static RuntimeException launderThrowable(Throwable t) {

if (t instanceof RuntimeException)
return (RuntimeException) t;
else if (t instanceof Error)
throw (Error) t;
else
throw new IllegalStateException("Not unchecked", t);
}
}


执行结果:



2.6 处理不可中断的阻塞

  在java库中,许多可阻塞的方法都是通过提前返回或者抛出 InterruptedException 来响应中断请求的,从而使开发人员更容易构建出能响应取消请求的任务。然而,并非所有的可阻塞方法或者阻塞机制都能响应中断:

造成线程阻塞的原因:

1. java.io包中的同步Socket I/O。如套接字中进行读写操作read, write方法。

2. java.io包中的同步I/O。如当中断或关闭正在InterruptibleChannel上等待的线程时,会对应抛出ClosedByInterruptException或 AsynchronousCloseException。

3. Selector的异步I/O。如果一个线程在调用Selector.select时阻塞了,那么调用close, wakeup会使线程抛出ClosedSelectorException。

4. 获取某个锁。当一个线程等待某个锁而阻塞时,不会响应中断。但Lock类的lockInterruptibly允许在等待锁时响应中断。

/**
* 7.11 通过改写 interrupt 方法将非标准的取消操作封装在 Thread 中
* @ClassName: ReaderThread
* @author xingle
* @date 2014-10-24 上午9:05:56
*/
public class ReaderThread extends Thread{

private static final int BUFSZ = 512;
private final Socket socket;
private final InputStream in;

public ReaderThread(Socket socket) throws IOException{
this.socket = socket;
this.in = socket.getInputStream();
}

public void interrupt(){
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
} finally{
super.interrupt();
             }
}

public void run(){
byte[] buf = new byte[BUFSZ];
while(true){
try {
int count = in.read(buf);
if (count < 0){
break;
}else if(count >0 ){
processBuffer(buf, count);
}

} catch (IOException e) {
//允许线程退出
}

}
}

private void processBuffer(byte[] buf, int count) {
// TODO Auto-generated method stub

}
}


2.7 采用 newTaskFor 来封装非标准的取消

  我们可以通过 newTaskFor 方法来进一步优化 ReaderThead 中封装非标准取消的技术,这是 Java 6 在 ThreadPoolExecutor 中的新增功能。当把一个 Callable 提交给 ExecutorService 时,submit 方法会返回一个 Future ,我们可以通过这个 Future 来取消任务。newTaskFor 是一个工厂方法,它将创建 Future 来代表任务。

  通过定制表示任务的 Future 可以改变Future.cancel 的行为。例如,定制的取消代码可以实现日志记录或者收集取消操作的统计信息,以及取消一些不响应中断的操作。通过改写 interrupt 方法,ReaderThead 可以取消基于套接字的线程。同样,通过改写任务的 Future.cancel 方法也可以实现类似的功能。

  在下面的程序中,定义了一个CancellableTask 接口,该接口扩展了 Callable,并增加了一个 cancel 方法和一个 newTask 工厂方法来构造RunnableFuture 。CancellingExecutor 扩展了 ThreadPoolExecutor ,并通过改写 newTaskFor 使得 CancellableTask 可以创建自己的 Future.

/**
* 7.12 通过 newTaskFor 将非标准的取消操作封装在一个任务中
*
* @ClassName: SocketUsingTask
* @author xingle
* @date 2014-10-24 下午2:27:07
*/
public class SocketUsingTask<T> implements CancellableTask<T> {

@GuardedBy("this")
private Socket socket;

protected synchronized void setSocket(Socket socket) {
this.socket = socket;
}

@Override
public T call() throws Exception {
//do working
return null;
}

@Override
public void cancel() {
try {
if (socket != null)
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}

@Override
public RunnableFuture<T> newTask() {
return new FutureTask<T>(this) {
public boolean cancel(boolean mayInterruptIfRunning) {
try {
SocketUsingTask.this.cancel();
} finally {
return super.cancel(mayInterruptIfRunning);
}
}
};
}

/**
* 通过newTaskFor将非标准的取消操作封装在任务中
*/
public class CancellingExecutor extends ThreadPoolExecutor {

/**
* @param corePoolSize
* @param maximumPoolSize
* @param keepAliveTime
* @param unit
* @param workQueue
*/
public CancellingExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
// TODO Auto-generated constructor stub
}

@Override
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
if (callable instanceof CancellableTask) { // 若是我们定制的可取消任务
return ((CancellableTask<T>) callable).newTask();
}
return super.newTaskFor(callable);
}
}

}

/**
* 可取消的任务接口
*/
interface CancellableTask<T> extends Callable<T> {
void cancel();

RunnableFuture<T> newTask();
}


  SocketUsingTask 实现了 CancellableTask,并定义了Future.cancel 来关闭套接字和调用 super.cancel。如果 SocketUsingTask 使用自己的 Future 来取消,那么底层的套接字将被关闭并且线程将被中断。

参考

1. 《并发编程实战》 第7章

2. 《Java并发编程》之五:取消和关闭线程
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: