您的位置:首页 > 编程语言 > Java开发

多线程(11)-Fork/Join-Java并行计算框架

2016-08-03 00:00 489 查看
摘要: Fork/Join-Java并行计算框架

前文

并行计算在处处都有大数据的今天已经不是一个新鲜的词汇了,现在已经有单机多核甚至多机集群并行计算,注意,这里说的是并行,而不是并发。

并发(concurrency):使多个操作可以在重叠的时间段内进行。

并行(parallesim):就是同时执行的意思。判断程序是否处于并行的状态,就看同一时刻是否有超过一个“工作单位”在运行就好了。所以,单线程永远无法达到并行状态。

为了充分利用多CPU、多核CPU的性能优势,级软基软件系统应该可以充分“挖掘”每个CPU的计算能力,决不能让某个CPU处于“空闲”状态。为此,可以考虑把一个任务拆分成多个“小任务”,把多个"小任务"放到多个处理器核心上并行执行。当多个“小任务”执行完成之后,再将这些执行结果合并起来即可。
如下面的示意图所示:



Java Fork&Join

Java在JDK7之后加入了并行计算的框架Fork/Join,可以解决我们系统中大数据计算的性能问题。Fork/Join采用的是分治法,Fork是将一个大任务拆分成若干个子任务,子任务分别去计算,而Join是获取到子任务的计算结果,然后合并,这个是递归的过程。子任务被分配到不同的核上执行时,效率最高。

但是使用Fork/Join的时候,要注意一些事项

除了fork() 和 join()方法外,线程不得使用其他的同步工具。线程最好也不要sleep()

线程不得进行I/O操作

线程不得抛出checked exception

实现类

ForkJoinPool 实现了Fork/Join的线程池,原理则是实现了工作窃取算法

ForkJoinTask ForkJoinTask代表一个可以并行、合并的任务,是一个抽象类,有两个子类

RecursiveTask 继承自ForkJoinTask,代表有返回值的任务

RecursiveAction 继承自ForkJoinTask,代表无返回值的任务

无返回值的示例

package wang.conge.javasedemo.core.thread;

import java.util.Arrays;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;

public class RecursiveActionTest {

public static void main(String[] args) throws InterruptedException {
//创建ForkJoinPool线程池,原理则是返回一个ForkJoinPool的实例
ForkJoinPool  forkjoinPool = new ForkJoinPool();

int[] nums = {0,1,2,3,4,5,6,7,8};

forkjoinPool.submit(new NumPrintRecursiveAction(nums));

forkjoinPool.awaitTermination(2, TimeUnit.SECONDS);
}

static class NumPrintRecursiveAction extends RecursiveAction{
private static final long serialVersionUID = 1L;
private int[] nums;

public NumPrintRecursiveAction(int[] nums){
this.nums = nums;
}

@Override
protected void compute() {
//如果数组的length小于4,直接执行
if(nums.length<4){
for(int num:nums){
System.out.println(num);
}
return;
}

//如果当前数组的length大于4,分成两个任务继续执行
int mid = nums.length/2;

int[] numsLeft = Arrays.copyOfRange(nums, 0, mid);
int[] numsRight = Arrays.copyOfRange(nums, mid, nums.length);

NumPrintRecursiveAction left = new NumPrintRecursiveAction(numsLeft);
NumPrintRecursiveAction right = new NumPrintRecursiveAction(numsRight);

//并发执行两个子任务
left.fork();
right.fork();
}

}
}

有返回值的示例

package wang.conge.javasedemo.core.thread;

import java.util.Arrays;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.TimeUnit;

public class RecursiveActionTest {

public static void main(String[] args) throws InterruptedException, ExecutionException {
//创建ForkJoinPool线程池,原理则是返回一个ForkJoinPool的实例
ForkJoinPool  forkjoinPool = new ForkJoinPool();

int[] nums = {0,1,2,3,4,5,6,7,8};

Future<Integer> future = forkjoinPool.submit(new NumCountRecursiveAction(nums));
System.out.println(future.get());

forkjoinPool.awaitTermination(2, TimeUnit.SECONDS);
}

static class NumCountRecursiveAction extends RecursiveTask<Integer>{
private static final long serialVersionUID = 1L;
private int[] nums;

public NumCountRecursiveAction(int[] nums){
this.nums = nums;
}

@Override
protected Integer compute() {
//如果数组的length小于4,直接执行
if(nums.length<4){
int total = 0;
for(int num:nums){
total = total + num;
}
return total;
}

//如果当前数组的length大于4,分成两个任务继续执行
int mid = nums.length/2;

int[] numsLeft = Arrays.copyOfRange(nums, 0, mid);
int[] numsRight = Arrays.copyOfRange(nums, mid, nums.length);

NumCountRecursiveAction left = new NumCountRecursiveAction(numsLeft);
NumCountRecursiveAction right = new NumCountRecursiveAction(numsRight);

//并发执行两个子任务
left.fork();
right.fork();

//汇总结果
return left.join() + right.join();
}

}
}

工作窃取算法

工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。工作窃取的运行流程图如下:



那么为什么需要使用工作窃取算法呢?

假如我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应

比如A线程负责处理A队列里的任务。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。

而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。

工作窃取算法的优点是充分利用线程进行并行计算,并减少了线程间的竞争,其缺点是在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且消耗了更多的系统资源,比如创建多个线程和多个双端队列。

引用文章

并发和并行

JDK 7 中的 Fork/Join 模式

Fork/Join框架介绍
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: