您的位置:首页 > 编程语言 > Java开发

Java并行程序基础(七)

2017-03-28 00:00 225 查看
Fork/Join框架

在jdk中,给出了一个ForkJoinPool线程池,对于fork()方法并不着急开启线程,而是提交给ForkJoinPool线程池进行处理,以节省系统资源。使用Fork/Join进行数据处理时的总体结构如图:


Fork/join执行逻辑



线程A已经把自己的任务都执行完成了,而线程B还有一堆任务等着处理,此时,线程A就会帮助线程B,从线程B的任务队列中拿一个任务过来处理,尽可能达到平衡。上图显示了这种互相帮助的精神。当线程试图帮助别人时,总是从任务队列的底部开始拿数据,而线程试图执行自己的任务时,则是从相反的顶部开始拿。因此这种行为也十分有利于避免数据竞争。

ForkJoinPool有一个重要的接口:

public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task)

ForkJoinTask有两个重要的子类,RecursiveAction和RecursiveTask,他们分别表示没有返回值的任务和可以携带返回值的任务。

package com.thread.t02;

import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

public class CountTask extends RecursiveTask<Long>{

private static final int THRESHOLD=10000;
private long start;
private long end;

public CountTask(long start, long end) {
super();
this.start = start;
this.end = end;
}

@Override
protected Long compute() {
long sum=0;
boolean canCompute = (end-start)<THRESHOLD;
if(canCompute){
for (long i = start; i <=end; i++) {
sum+=i;
}
}else{
long step = (start+end)/100;
ArrayList<CountTask> subTasks = new ArrayList<>();
long pos = start;
for(int i=0;i<100;i++){
long lastOne = pos+step;
if(lastOne>end) lastOne=end;
CountTask subTask = new CountTask(pos, lastOne);
pos+=step+1;
subTasks.add(subTask);
subTask.fork();
}
for(CountTask t:subTasks){
sum+=t.join();
}
}
return sum;
}

public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool();
CountTask task = new CountTask(0, 20000L);
ForkJoinTask<Long> result = forkJoinPool.submit(task);

long res;
try {
res = result.get();
System.out.println("sum="+res);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}

}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: