您的位置:首页 > 编程语言

并发编程之Callable和Future接口、FutureTask类

2013-11-27 11:28 471 查看
Thread、Runnable、Callable,其中Runnable实现的是void run()方法,Callable实现的是V call()方法,并且可以返回执行结果,其中Runnable可以提交给Thread来包装下,直接启动一个线程来执行,而Callable则一般都是提交给ExecuteService来执行。Executor就是Runnable和Callable的调度容器,Future就是对于具体的调度任务的执行结果进行查看,最为关键的是Future可以检查对应的任务是否已经完成,也可以阻塞在get方法上一直等待任务返回结果。Runnable和Callable的差别就是Runnable是没有结果可以返回的,并且Runnable无法抛出返回结果的异常,就算是通过Future也看不到任务调度的结果的。

FutureTask则实现了Runnbale又实现了Futrue<V>这两个接口,另外它还可以包装Runnable和Callable<V>,所以一般来讲是一个符合体了,它可以通过Thread包装来直接执行,也可以提交给ExecuteService来执行,并且还可以通过v get()返回执行结果,在线程体没有执行完成的时候,主线程一直阻塞等待,执行完则直接返回结果。

Callable和Future接口示例程序:该程序是计算一个公司的年销售水泥的总数目,每一行代表一个客户,每一列代表一个客户在每个月内的购买数量,将每一个客户(每一行)看做一个小任务。每一个任务计算之后放入Future中,等待所有的计算完毕后,调用get(是一个阻塞方法,等待所有任务执行完毕)方法得到结果并计算总和。

import java.text.DateFormatSymbols;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class AnnualSalesCalc {

private static int NUMBER_OF_CUSTOMERS = 100;
private static int NUMBER_OF_MONTHS = 12;
private static int salesMatrix[][];

private static class Summer implements Callable<Integer> {
private int companyID;

public Summer(int companyID) {
this.companyID = companyID;
}

@Override
public Integer call() {
int sum = 0;
for (int col = 0; col < NUMBER_OF_MONTHS; col++) {
sum += salesMatrix[companyID][col];
}
System.out.printf("Totaling for client 1%02d completed%n",
companyID);
return sum;
}
}

private static void generateMatrix() {
salesMatrix = new int[NUMBER_OF_CUSTOMERS][NUMBER_OF_MONTHS];
for (int i = 0; i < NUMBER_OF_CUSTOMERS; i++) {
for (int j = 0; j < NUMBER_OF_MONTHS; j++) {
salesMatrix[i][j] = (int) (Math.random() * 100);
}
}
}

private static void printMatrix() {
System.out.print("\t\t");
String[] monthDisplayNames = (new DateFormatSymbols()).getShortMonths();
for (String strName : monthDisplayNames) {
System.out.printf("%12s", strName);
}
System.out.println();
for (int i = 0; i < monthDisplayNames.length - 1; i++) {
System.out.print("=======");
}
System.out.println("====");
for (int i = 0; i < NUMBER_OF_CUSTOMERS; i++) {
System.out.printf("Client ID:1%02d%2s", i, "|");
for (int j = 0; j < NUMBER_OF_MONTHS; j++) {
System.out.printf("%6d", salesMatrix[i][j]);
}
System.out.println();
}
System.out.println();
}

public static void main(String[] args) throws InterruptedException,
ExecutionException {
generateMatrix();
printMatrix();
ExecutorService executor = Executors.newFixedThreadPool(10);
Set<Future<Integer>> set = new HashSet<Future<Integer>>();
for (int row = 0; row < NUMBER_OF_CUSTOMERS; row++) {
Callable<Integer> callable = new Summer(row);
Future<Integer> future = executor.submit(callable);
set.add(future);
}
int sum = 0;
for (Future<Integer> future : set) {
sum += future.get();
}
System.out.printf("%nThe annual turnover (bags): %s%n%n", sum);
executor.shutdown();
}
}
部分结果:

一月  二月 三月  四月  五月  六月  七月  八月  九月  十月 十一月 十二月
========================================================================================
Client ID:100 |     5    68    12    30    66    60    16    70    17    87    51    72
Client ID:101 |    44    67    55    10    29    65    15    48    54    83    49    34
Client ID:102 |    64    46    17    73    88    10     1    88    43     0    90    22
.......................................................................................
Client ID:197 |    98    83    40    51    98    95    90    85    49    56    12    41
Client ID:198 |    81     5     6    29    30    28    36    99    94    19     5    46
Client ID:199 |    65    71    84    62    76    72    23    27     2    24    62    82

Totaling for client 100 completed
Totaling for client 102 completed
Totaling for client 101 completed
................................
Totaling for client 103 completed
Totaling for client 198 completed
Totaling for client 197 completed
Totaling for client 196 completed

The annual turnover (bags): 59676
FutureTask示例程序:演示股票交易程序,一个懒惰线程随时可以取消提交的任务,如果订单已经完成取消失败,如果任务正在执行且可以中断则取消该线程剩余的处理流程,如果订单已提交并且在分配给线程执行之前被取消,则订单会取消成功。

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class StocksOrderProcessor {
public static final int MAX_NUMBER_OF_ORDERS = 1000;
private static final ExecutorService executor = Executors
.newFixedThreadPool(100);
private static List<Future<Integer>> ordersToProcess = new ArrayList<Future<Integer>>();

private static class OrderExecutor implements Callable<Integer> {
private int id = 0;
private int count = 0;

public OrderExecutor(int id) {
this.id = id;
}

@Override
public Integer call() throws Exception {
while (count < 50) {
count++;
Thread.sleep(new Random(System.currentTimeMillis() % 100)
.nextInt(10));
}
System.out.println("Successfully executed order: " + id);
return id;
}
}

private static void submitOrder(int id) {
Callable<Integer> callable = new OrderExecutor(id);
ordersToProcess.add(executor.submit(callable));
}

public static void main(String[] args) {
System.out.printf("Submitting %d trades%n", MAX_NUMBER_OF_ORDERS);
for (int i = 0; i < MAX_NUMBER_OF_ORDERS; i++) {
submitOrder(i);
}
new Thread(new EvilThread(ordersToProcess)).start();
System.out.println("Cancelling a few orders at random");
try {
// 为了让所有任务都可以完成,让执行器等待30秒钟
executor.awaitTermination(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Checking status before shutdown");
int count = 0;
for (Future<Integer> future : ordersToProcess) {
if (future.isCancelled()) {
count++;
}
}
System.out.printf("%d trades cancelled%n", count);
// shutdown方法并不意味着之前提交的任务被立即取消,而是发起任务顺序停止以便之前提交的任务
// 可以被执行,但是他保证不再接受新的任务。ExecuteExistingDelayedTaskAfterShutdownPolicy
// 被设置为false意味着现有未完成的任务会被取消,反之,不会被取消。
// ContinueExistingPeriodicTasksAfterShutdownPolicy
// 被设置为true,那么已有的周期性任务会被取消。
executor.shutdown();
}

}

class EvilThread implements Runnable {
private List<Future<Integer>> ordersToProcess;

public EvilThread(List<Future<Integer>> futures) {
this.ordersToProcess = futures;
}

@Override
public void run() {
Random myNextKill = new Random(System.currentTimeMillis() % 100);
for (int i = 0; i < 100; i++) {
int index = myNextKill
.nextInt(StocksOrderProcessor.MAX_NUMBER_OF_ORDERS);
boolean cancel = ordersToProcess.get(index).cancel(true);
if (cancel == true) {
System.out.println("Cancel Order Succeed: " + index);
} else {
System.out.println("Cancel Order Failed: " + index);
}
try {
Thread.sleep(myNextKill.nextInt(100));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
部分执行结果:

Submitting 1000 trades
Cancelling a few orders at random
Cancel Order Succeed: 888
Cancel Order Succeed: 765
Successfully executed order: 87
Cancel Order Succeed: 205
Successfully executed order: 689
Cancel Order Failed: 553
Cancel Order Succeed: 898
Cancel Order Failed: 15
Successfully executed order: 801
...............................
Successfully executed order: 847
Cancel Order Succeed: 859
Successfully executed order: 721
..............................
Successfully executed order: 799
Successfully executed order: 981
...............................
Successfully executed order: 999
Successfully executed order: 997
Cancel Order Failed: 306
。。。。。。。。。。。。。
Cancel Order Failed: 480
Cancel Order Failed: 304
Cancel Order Failed: 56
Cancel Order Failed: 941
Cancel Order Failed: 328
Checking status before shutdown
25 trades cancelled
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: