多线程(11)-Fork/Join-Java并行计算框架
2016-08-03 00:00
489 查看
摘要: Fork/Join-Java并行计算框架
并发(concurrency):使多个操作可以在重叠的时间段内进行。
并行(parallesim):就是同时执行的意思。判断程序是否处于并行的状态,就看同一时刻是否有超过一个“工作单位”在运行就好了。所以,单线程永远无法达到并行状态。
为了充分利用多CPU、多核CPU的性能优势,级软基软件系统应该可以充分“挖掘”每个CPU的计算能力,决不能让某个CPU处于“空闲”状态。为此,可以考虑把一个任务拆分成多个“小任务”,把多个"小任务"放到多个处理器核心上并行执行。当多个“小任务”执行完成之后,再将这些执行结果合并起来即可。
如下面的示意图所示:
但是使用Fork/Join的时候,要注意一些事项
除了fork() 和 join()方法外,线程不得使用其他的同步工具。线程最好也不要sleep()
线程不得进行I/O操作
线程不得抛出checked exception
ForkJoinTask ForkJoinTask代表一个可以并行、合并的任务,是一个抽象类,有两个子类
RecursiveTask 继承自ForkJoinTask,代表有返回值的任务
RecursiveAction 继承自ForkJoinTask,代表无返回值的任务
那么为什么需要使用工作窃取算法呢?
假如我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应
比如A线程负责处理A队列里的任务。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。
而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。
工作窃取算法的优点是充分利用线程进行并行计算,并减少了线程间的竞争,其缺点是在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且消耗了更多的系统资源,比如创建多个线程和多个双端队列。
JDK 7 中的 Fork/Join 模式
Fork/Join框架介绍
前文
并行计算在处处都有大数据的今天已经不是一个新鲜的词汇了,现在已经有单机多核甚至多机集群并行计算,注意,这里说的是并行,而不是并发。并发(concurrency):使多个操作可以在重叠的时间段内进行。
并行(parallesim):就是同时执行的意思。判断程序是否处于并行的状态,就看同一时刻是否有超过一个“工作单位”在运行就好了。所以,单线程永远无法达到并行状态。
为了充分利用多CPU、多核CPU的性能优势,级软基软件系统应该可以充分“挖掘”每个CPU的计算能力,决不能让某个CPU处于“空闲”状态。为此,可以考虑把一个任务拆分成多个“小任务”,把多个"小任务"放到多个处理器核心上并行执行。当多个“小任务”执行完成之后,再将这些执行结果合并起来即可。
如下面的示意图所示:
Java Fork&Join
Java在JDK7之后加入了并行计算的框架Fork/Join,可以解决我们系统中大数据计算的性能问题。Fork/Join采用的是分治法,Fork是将一个大任务拆分成若干个子任务,子任务分别去计算,而Join是获取到子任务的计算结果,然后合并,这个是递归的过程。子任务被分配到不同的核上执行时,效率最高。但是使用Fork/Join的时候,要注意一些事项
除了fork() 和 join()方法外,线程不得使用其他的同步工具。线程最好也不要sleep()
线程不得进行I/O操作
线程不得抛出checked exception
实现类
ForkJoinPool 实现了Fork/Join的线程池,原理则是实现了工作窃取算法ForkJoinTask ForkJoinTask代表一个可以并行、合并的任务,是一个抽象类,有两个子类
RecursiveTask 继承自ForkJoinTask,代表有返回值的任务
RecursiveAction 继承自ForkJoinTask,代表无返回值的任务
无返回值的示例
package wang.conge.javasedemo.core.thread; import java.util.Arrays; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; import java.util.concurrent.TimeUnit; public class RecursiveActionTest { public static void main(String[] args) throws InterruptedException { //创建ForkJoinPool线程池,原理则是返回一个ForkJoinPool的实例 ForkJoinPool forkjoinPool = new ForkJoinPool(); int[] nums = {0,1,2,3,4,5,6,7,8}; forkjoinPool.submit(new NumPrintRecursiveAction(nums)); forkjoinPool.awaitTermination(2, TimeUnit.SECONDS); } static class NumPrintRecursiveAction extends RecursiveAction{ private static final long serialVersionUID = 1L; private int[] nums; public NumPrintRecursiveAction(int[] nums){ this.nums = nums; } @Override protected void compute() { //如果数组的length小于4,直接执行 if(nums.length<4){ for(int num:nums){ System.out.println(num); } return; } //如果当前数组的length大于4,分成两个任务继续执行 int mid = nums.length/2; int[] numsLeft = Arrays.copyOfRange(nums, 0, mid); int[] numsRight = Arrays.copyOfRange(nums, mid, nums.length); NumPrintRecursiveAction left = new NumPrintRecursiveAction(numsLeft); NumPrintRecursiveAction right = new NumPrintRecursiveAction(numsRight); //并发执行两个子任务 left.fork(); right.fork(); } } }
有返回值的示例
package wang.conge.javasedemo.core.thread; import java.util.Arrays; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.RecursiveAction; import java.util.concurrent.RecursiveTask; import java.util.concurrent.TimeUnit; public class RecursiveActionTest { public static void main(String[] args) throws InterruptedException, ExecutionException { //创建ForkJoinPool线程池,原理则是返回一个ForkJoinPool的实例 ForkJoinPool forkjoinPool = new ForkJoinPool(); int[] nums = {0,1,2,3,4,5,6,7,8}; Future<Integer> future = forkjoinPool.submit(new NumCountRecursiveAction(nums)); System.out.println(future.get()); forkjoinPool.awaitTermination(2, TimeUnit.SECONDS); } static class NumCountRecursiveAction extends RecursiveTask<Integer>{ private static final long serialVersionUID = 1L; private int[] nums; public NumCountRecursiveAction(int[] nums){ this.nums = nums; } @Override protected Integer compute() { //如果数组的length小于4,直接执行 if(nums.length<4){ int total = 0; for(int num:nums){ total = total + num; } return total; } //如果当前数组的length大于4,分成两个任务继续执行 int mid = nums.length/2; int[] numsLeft = Arrays.copyOfRange(nums, 0, mid); int[] numsRight = Arrays.copyOfRange(nums, mid, nums.length); NumCountRecursiveAction left = new NumCountRecursiveAction(numsLeft); NumCountRecursiveAction right = new NumCountRecursiveAction(numsRight); //并发执行两个子任务 left.fork(); right.fork(); //汇总结果 return left.join() + right.join(); } } }
工作窃取算法
工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。工作窃取的运行流程图如下:那么为什么需要使用工作窃取算法呢?
假如我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应
比如A线程负责处理A队列里的任务。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。
而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。
工作窃取算法的优点是充分利用线程进行并行计算,并减少了线程间的竞争,其缺点是在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且消耗了更多的系统资源,比如创建多个线程和多个双端队列。
引用文章
并发和并行JDK 7 中的 Fork/Join 模式
Fork/Join框架介绍
相关文章推荐
- Java多线程(11) Fork-Join框架
- Java线程(十一):Fork/Join-Java并行计算框架
- java多线程之并行框架ForkJoin
- Java多线程 -- JUC包源码分析19 -- ForkJoinPool/ForkJoinTask
- Java多线程之Fork/Join框架基本使用(八)
- java多线程之并行框架ForkJoin
- java多线程之并行框架ForkJoin
- java多线程之并行框架ForkJoin
- Java多线程之~~~Fork/Join框架的同步和异步
- java多线程之并行框架ForkJoin
- java多线程之并行框架ForkJoin
- java多线程之并行框架ForkJoin
- java多线程解说【拾贰】_并发框架:Fork/Join
- java多线程之并行框架ForkJoin
- java多线程之并行框架ForkJoin
- 0041 Java学习笔记-多线程-线程池、ForkJoinPool、ThreadLocal
- Java对多线程~~~Fork/Join同步和异步帧
- Java 多线程任务分解-ForkJoinPool(带返回值)示例
- java多线程——8 fork/join
- java多线程之fork/join