您的位置:首页 > 编程语言 > Java开发

java并发 java7之fork-join框架

2014-08-11 15:22 295 查看

如果让我个人理解什么是fork-join,我立刻会想到hadoop的map/reduce。他们是同一种模型。更早之前,笔者就看过关于fork-join的相关文章,但是理解的都不够深刻。自从深入研究了java多线程这块的相关技术,再回头来看fork-join,觉得真是个了不起的技术。

概述

fork/join框架是ExecutorService接口的一种具体实现,目的是为了帮助你更好地利用多处理器带来的好处。它是为那些能够被递归地拆解成子任务的工作类型量身设计的。其目的在于能够使用所有可用的运算能力来提升你的应用的性能。  

类似于ExecutorService接口的其他实现,fork/join框架会将任务分发给线程池中的工作线程。fork/join框架的独特之处在与它使用工作窃取(work-stealing)算法。完成自己的工作而处于空闲的工作线程能够从其他仍然处于忙碌(busy)状态的工作线程处窃取等待执行的任务。 fork/join框架的核心是ForkJoinPool类,它是对AbstractExecutorService类的扩展。ForkJoinPool实现了工作偷取算法,并可以执行ForkJoinTask任务。

Divide and conquer

合并排序是 divide-and-conquer 算法的一个例子,在这种算法中将一个问题递归分解成子问题,再将子问题的解决方案组合得到最终结果。 divide-and-conquer 算法也可用于顺序环境中,但是在并行环境中更加有效,因为可以并行处理子问题。

并行 divide-and-conquer 算法首先对问题进行评估,确定其大小是否更适合使用顺序解决方案;通常,可通过将问题大小与某个阙值进行比较完成。如果问题大到需要并行分解,算法会将其分解成两个或更多子问题,并以并行方式对子问题递归调用算法本身,然后等待子问题的结果,最后合并这些结果。用于选择顺序和并行执行方法的理想阙值是协调并行任务的成本。如果协调成本为 0,更多的更细粒度的任务会提供更好的并行性;在需要转向顺序方法之前,协调成本越低,就可以划分更细粒度的任务。

递归(旧的方法)

如果从一个数组中,选一个最大值。我一般都会采用递归调用。这样的话只有当前线程的一个方法是运行的,其他方法都阻塞在线程栈中。所以在多核情况下,其他CPU都是空闲,没有得到充分利用。

并行计算(fork-join)

类似hadoop的map/reduce,可以将任务拆分成多个块,然后最终将这些子结果集合并成最终结果集。它的行为表现为当前任务是暂停的,并行执行两个子任务,而当前任务等待两个子任务的完成。然后就可以将两个子任务的结果进行合并。这种并行分解方法常常称作 fork-join,因为执行一个任务将首先分解(fork)为多个子任务,然后再合并(join)(完成后)。

使用fork/join框架的第一步是编写执行一部分工作的代码。你的代码结构看起来应该与下面所示的伪代码类似:

Java代码


 




if (当前这个任务工作量足够小)  
    直接完成这个任务  
else  
    将这个任务或这部分工作分解成两个部分  
    分别触发(invoke)这两个子任务的执行,并等待结果  

if (当前这个任务工作量足够小)
直接完成这个任务
else
将这个任务或这部分工作分解成两个部分
分别触发(invoke)这两个子任务的执行,并等待结果


示例代码如下:求

Java代码


 




import java.util.concurrent.ForkJoinPool;  
import java.util.concurrent.RecursiveTask;  
  
/** 
 * @author piaohailin 
 * @date   2014-3-26 
*/  
public class Fibonacci extends RecursiveTask<Long> {  
    private static final long serialVersionUID = 7875142223684511653L;  
    private final long        n;  
  
    Fibonacci(long n) {  
        this.n = n;  
    }  
  
    protected Long compute() {  
        if (n <= 1) {  
            return n;  
        }  
        Fibonacci f1 = new Fibonacci(n - 1);  
        f1.fork();  
        Fibonacci f2 = new Fibonacci(n - 2);  
        return f2.compute() + f1.join();  
    }  
  
    public static void main(String[] args) {  
        Fibonacci task = new Fibonacci(10);  
        ForkJoinPool pool = new ForkJoinPool(4);  
  
        pool.invoke(task);  
        System.out.println(task.getRawResult());  
    }  
  
}  

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

/**
* @author piaohailin
* @date   2014-3-26
*/
public class Fibonacci extends RecursiveTask<Long> {
private static final long serialVersionUID = 7875142223684511653L;
private final long        n;

Fibonacci(long n) {
this.n = n;
}

protected Long compute() {
if (n <= 1) {
return n;
}
Fibonacci f1 = new Fibonacci(n - 1);
f1.fork();
Fibonacci f2 = new Fibonacci(n - 2);
return f2.compute() + f1.join();
}

public static void main(String[] args) {
Fibonacci task = new Fibonacci(10);
ForkJoinPool pool = new ForkJoinPool(4);

pool.invoke(task);
System.out.println(task.getRawResult());
}

}


工作窃取

fork-join 框架通过一种称作工作窃取(work stealing) 的技术减少了工作队列的争用情况。每个工作线程都有自己的工作队列,这是使用双端队列(或者叫做 deque)来实现的(Java 6 在类库中添加了几种 deque 实现,包括 ArrayDeque 和 LinkedBlockingDeque)。当一个任务划分一个新线程时,它将自己推到 deque 的头部。当一个任务执行与另一个未完成任务的合并操作时,它会将另一个任务推到队列头部并执行,而不会休眠以等待另一任务完成(像 Thread.join()
的操作一样)。当线程的任务队列为空,它将尝试从另一个线程的 deque 的尾部 窃取另一个任务。

可以使用标准队列实现工作窃取,但是与标准队列相比,deque 具有两方面的优势:减少争用和窃取。因为只有工作线程会访问自身的 deque 的头部,deque 头部永远不会发生争用;因为只有当一个线程空闲时才会访问 deque 的尾部,所以也很少存在线程的 deque 尾部的争用(在 fork-join 框架中结合 deque 实现会使这些访问模式进一步减少协调成本)。跟传统的基于线程池的方法相比,减少争用会大大降低同步成本。此外,这种方法暗含的后进先出(last-in-first-out,LIFO)任务排队机制意味着最大的任务排在队列的尾部,当另一个线程需要窃取任务时,它将得到一个能够分解成多个小任务的任务,从而避免了在未来窃取任务。因此,工作窃取实现了合理的负载平衡,无需进行协调并且将同步成本降到了最小。





标准实现

除了能够使用fork/join框架来实现能够在多处理系统中被并行执行的定制化算法(如前文中的ForkBlur.java例子),在Java SE中一些比较常用的功能点也已经使用fork/join框架来实现了。在Java SE 8中,java.util.Arrays类的一系列parallelSort()方法就使用了fork/join来实现。这些方法与sort()系列方法很类似,但是通过使用fork/join框架,借助了并发来完成相关工作。在多处理器系统中,对大数组的并行排序会比串行排序更快。这些方法究竟是如何运用fork/join框架并不在本教程的讨论范围内。想要了解更多的信息,请参见Java
API文档。 其他采用了fork/join框架的方法还包括java.util.streams包中的一些方法,此包是作为Java SE 8发行版中Project Lambda的一部分。想要了解更多信息,请参见Lambda Expressions一节。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息