您的位置:首页 > 编程语言 > Java开发

Java Future/FutureTask

2014-05-29 14:42 176 查看

1、Future和FutureTask能干什么

1)可以用于等待线程结束,并且从线程中获取结果。

2)可以用于取消线程的执行。当然这个可以通过interupt来实现,但是使用Future框架更加简便。

例子:

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;

public class FutureTaskTest {
public static void main(String[] args) {
final FutureTask<Object> f = new FutureTask<Object>(
new Callable<Object>() {

@Override
public Object call() {
try {
Thread.currentThread().sleep(50000);
} catch (InterruptedException e) {
System.out.println("here");
}
return new Object();
}
});

new Thread(f).start();

try {
Thread.currentThread().sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
f.cancel(true);
}
}


线程执行后,会被挂起50s。然而10s后,在主线程中调用了f.cancel()取消了线程的执行,因此线程的执行被取消。

2、FutureTask分析

public class FutureTask<V> implements RunnableFuture<V> {
public interface RunnableFuture<V> extends Runnable, Future<V> {
public interface Future<V> {


FutureTask是Runnable接口实现,因此可以看到上面的例子中new Thread(f).start()。
FutureTask的所有并发控制都是依靠的AbstractQueuedSynchronizer。来看看FutureTask是如何工作的。将FutrueTask传入到Thread中,当线程启动的时候,会调用FutureTask的方法run,
public void run() {
sync.innerRun();
}


可以看到run只做了一件事情,就是调用了sync的innerRun方法。来看看sync的定义,
private final Sync sync;
private final class Sync extends AbstractQueuedSynchronizer {
来看看innerRun方法是如何实现的,
void innerRun() {
// 利用原子操作更新state状态,如果当前状态时READY,则更新为RUNNING
// 1)如果更新成功,则继续往下执行
// 2)否则,直接返回。任务运行失败
if (!compareAndSetState(READY, RUNNING))
return;

// 获得当前的线程,存储到runner中
runner = Thread.currentThread();
// 重新检查当前的状态是不是RUNNING,比如调用了cancel方法后,使得状态成为了CANCELLED
if (getState() == RUNNING) { // recheck after setting thread
V result;
try {
// 调用实际的callable中的call方法,执行任务,并且获得返回值
result = callable.call();
} catch (Throwable ex) {
setException(ex);
return;
}
// 设置结果
set(result);
} else {
// 释放锁
releaseShared(0); // cancel
}
}


可以看到前面的原子操作有两个作用,
1)防止线程的重复运行,一次运行后利用CAS操作将state设置为了RUNNING,如果第二次运行run(),原子操作就会出错,这样就防止了线程的重复运行。
2)防止线程被取消后,还往下运行。调用了cancel后,state被设置为了CANCELLED,这时运行到run的原子操作,就会出错,防止线程继续往下执行。

后面就是获取当前线程,然后调用实质的方法,并且将结果保存下来。

这样调用get方法的时候,就可以或得到结果result。
继续来看一下cancel方法是如何完成线程任务取消的。

boolean innerCancel(boolean mayInterruptIfRunning) {
// 循环,确保原子操作能够正确设置state为CANCELLED
for (;;) {
int s = getState();
// 判断当前任务是否已经完成或者已经被取消了,如果是则直接返回
if (ranOrCancelled(s))
return false;
// 否则 ,原子操作更新state为CANCELLED
// 1)更新成功,则跳出循环
// 2)更新失败,则继续循环,确保取消工作能够正确执行。
if (compareAndSetState(s, CANCELLED))
break;
}
// mayInterruptIfRunning == true
if (mayInterruptIfRunning) {
Thread r = runner;
if (r != null)
// 调用线程的中断函数
r.interrupt();
}
// 释放锁
releaseShared(0);
done();
return true;
}


注意这里面调用了线程的中断函数。我们知道线程的中断函数并不会立刻中断函数,而是更新了线程的标志位,由线程在适当的阶段响应。修改上面的例子如下,

import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;

public class FutureTaskTest {
public static void main(String[] args) {
final FutureTask<Object> f = new FutureTask<Object>(
new Callable<Object>() {

@Override
public Object call() {
int i = 0;
while (i < 10) {
int j = 0;
while (j < Integer.MAX_VALUE) {
j++;
}
System.out.println(j);
i++;
}
return new Object();
}
});

new Thread(f).start();

try {
Thread.currentThread().sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
f.cancel(true);
}
}


运行后可以看到如下结果,

2147483647
2147483647
2147483647
2147483647
2147483647
2147483647
2147483647
2147483647
2147483647
2147483647


和之前的例子不一样,cancel函数并没有起到它应有的作用,也就是线程实际上并没有被取消。看看JDK文档中关于interrupt函数的定义,

Interrupts this thread.

Unless the current thread is interrupting itself, which is always permitted, the checkAccess method of this thread is invoked, which may cause a SecurityException to be thrown.

If this thread is blocked in an invocation of the wait(), wait(long), or wait(long, int) methods of the Object class, or of the join(), join(long), join(long, int), sleep(long), or sleep(long, int), methods of this class, then its interrupt status will be cleared and it will receive an InterruptedException.

If this thread is blocked in an I/O operation upon an interruptible channel then the channel will be closed, the thread's interrupt status will be set, and the thread will receive a java.nio.channels.ClosedByInterruptException.

If this thread is blocked in a java.nio.channels.Selector then the thread's interrupt status will be set and it will return immediately from the selection operation, possibly with a non-zero value, just as if the selector's wakeup method were invoked.

If none of the previous conditions hold then this thread's interrupt status will be set.

Interrupting a thread that is not alive need not have any effect.


看一下这句话,If none of the previous conditions hold then this thread's interrupt status will be set.我们修改的例子正好符合这句话,没有任何的阻塞操作,所以调用Interrupt的时候并没有实现取消操作。那么应该如何修改呢?

修改循环条件如下,

while (i < 10
&& !Thread.currentThread().isInterrupted()) {
int j = 0;
while (j < Integer.MAX_VALUE
&& !Thread.currentThread().isInterrupted()) {
j++;
}
System.out.println(j);
i++;
}
就可以实现取消操作了。

这给了我们启示,如果想在Future和FutureTask中实现取消操作,也需要正确的利用线程的标志位Thread.currentThread().isInterrupted()方法。否则可能会出现无法取消的情况。

来看看get是如何实现的。

V innerGet() throws InterruptedException, ExecutionException {
acquireSharedInterruptibly(0);
if (getState() == CANCELLED)
throw new CancellationException();
if (exception != null)
throw new ExecutionException(exception);
return result;
}


可以看到已经被取消的线程是不能通过get方法获得结果的,否则会抛出CancellationException。而其主要的功能都在acquireSharedInterruptibly中完成,

public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}


首先检查线程是否已经被取消(中断),

/**
* Implements AQS base acquire to succeed if ran or cancelled
*/
protected int tryAcquireShared(int ignore) {
return innerIsDone() ? 1 : -1;
}


如果任务还没有完成,则返回-1,会调用doAcquireSharedInterruptibly阻塞get方法。

而innerRun中最后的releaseShared(0)会取消阻塞,使得get函数返回result。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: