您的位置:首页 > 编程语言 > Java开发

Java并发编程-27-异常处理及取消任务

2015-06-18 21:49 671 查看
一、取消任务

ForkJoinTask类提供的cancel()方法允许取消一个仍没有被执行的任务

任务的取消对于已经发送到线程池的任务没有任何影响,他们将继续执行

Fork/Join框架的局限性在于,ForkJoinPool线程池中的任务不允许被取消

示例中:我们创建一个辅助类,它储存发送到线程池所有任务。可以用一个方法来取消存储的所有任务,如果任务已经执行或者结束,那么任务就不可能被取消

package com.currency.forkandjoin;

import java.util.Random;

/**
* 生成一个指定大小的随机整数数组
*
* @author Nicholas
*
*/
public class ArrayGenerator {

public int[] generatorArray(int size) {
int[] array = new int[size];

Random random = new Random();
for (int i = 0; i < size; i++) {
array[i] = random.nextInt(10);
}
return array;
}
}

package com.currency.forkandjoin;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinTask;

/**
* 这个类存储--在ForkJoinTask中创建的任务
* @author Nicholas
*
*/
public class TaskManager {

private List<ForkJoinTask<Integer>> tasks;

public TaskManager() {
tasks = new ArrayList<ForkJoinTask<Integer>>();
}

public void addTask(ForkJoinTask<Integer> task) {
tasks.add(task);
}

public void cancelTask(ForkJoinTask<Integer> canceltask) {
for (ForkJoinTask<Integer> forkJoinTask : tasks) {
if (forkJoinTask != canceltask) {
forkJoinTask.cancel(true);
((SearchNumberTask) forkJoinTask).writeCancelMessage();
}
}
}
}

package com.currency.forkandjoin;

import java.util.concurrent.RecursiveTask;
import java.util.concurrent.TimeUnit;

public class SearchNumberTask extends RecursiveTask<Integer> {

private static final long serialVersionUID = 1L;
private int[] numbers;
private int start, end;
private int number;
private TaskManager taskManager;

public SearchNumberTask(int[] numbers, int start, int end, int number,
TaskManager taskManager) {
this.numbers = numbers;
this.start = start;
this.end = end;
this.number = number;
this.taskManager = taskManager;
}

@Override
protected Integer compute() {
System.out.println("Task : " + start + " : " + end);
int ret = 0;
if (end - start > 10) {
ret = luanchTasks();
} else {
ret = lookForNumber();
}
return ret;
}

public Integer lookForNumber() {
for (int i = start; i < end; i++) {
if (numbers[i] == number) {
System.out.printf("Task : number %d found in position %d\n",
number, i);
taskManager.cancelTask(this);
return i;
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return -1;
}

public int luanchTasks() {
int mid = (end + start) / 2;
SearchNumberTask searchNumberTask1 = new SearchNumberTask(numbers,
start, mid, number, taskManager);
SearchNumberTask searchNumberTask2 = new SearchNumberTask(numbers,
mid, end, number, taskManager);

taskManager.addTask(searchNumberTask1);
taskManager.addTask(searchNumberTask2);

searchNumberTask1.fork();
searchNumberTask2.fork();

int ret = 0;
ret = searchNumberTask1.join();
if (ret != -1) {
return ret;
}
ret = searchNumberTask2.join();
return ret;
}

public void writeCancelMessage() {
System.out.printf("Task : Cancelled task from %d to %d\n", start, end);
}
}

package com.currency.forkandjoin;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;

public class Main2 {

public static void main(String[] args) {

ArrayGenerator arrayGenerator = new ArrayGenerator();
int[] array = arrayGenerator.generatorArray(1000);

TaskManager taskManager = new TaskManager();

ForkJoinPool forkJoinPool = new ForkJoinPool();

SearchNumberTask searchNumberTask = new SearchNumberTask(array, 0,
1000, 5, taskManager);

forkJoinPool.execute(searchNumberTask);

forkJoinPool.shutdown();

try {
forkJoinPool.awaitTermination(1, TimeUnit.DAYS);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Main : The program has finished\n");
}
}


二、异常处理

package com.currency.forkandjoin;

import java.util.concurrent.RecursiveTask;
import java.util.concurrent.TimeUnit;

public class Task extends RecursiveTask<Integer> {

private static final long serialVersionUID = 1L;

private int[] array;
private int start, end;

public Task(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}

@Override
protected Integer compute() {

System.out.println("Task : start form " + start + " to " + end);
if (end - start < 10) {
if (3 > start && 3 < end) {
throw new RuntimeException(
"This task throw an execption .Task : start form "
+ start + " to " + end);
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
int mid = (end + start) / 2;
Task task1 = new Task(array, start, mid);
Task task2 = new Task(array, mid, end);
invokeAll(task1, task2);
}
System.out.println("Task : End form " + start + " to " + end);

return 0;
}

}

package com.currency.forkandjoin;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;

public class Main {

public static void main(String[] args) {
int[] array = new int[100];
Task task = new Task(array, 0, 100);

ForkJoinPool forkJoinPool = new ForkJoinPool();
forkJoinPool.execute(task);

forkJoinPool.shutdown();

try {
forkJoinPool.awaitTermination(1, TimeUnit.DAYS);
} catch (InterruptedException e) {
e.printStackTrace();
}

if (task.isCompletedAbnormally()) {
System.out.println("Main : An execption has ocurred");
System.out.println("Main : " + task.getException());
} else {
System.out.println("Main : Result :" + task.join());
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: