Fork/join框架之ForkJoinPool
2013-08-25 17:23
399 查看
概述
jdk7新增了并发框架-fork/join框架,在这种框架下,ForkJoinTask代表一个需要执行的任务,真正执行这些任务的线程是放在一个线程池(ForkJoinPool)里面。ForkJoinPool是一个可以执行ForkJoinTask的ExcuteService,与ExcuteService不同的是它采用了work-stealing模式:所有在池中的线程尝试去执行其他线程创建的子任务,这样就很少有线程处于空闲状态,非常高效。池中维护着ForkJoinWorkerThread对象数组,数组大小由parallelism属性决定,parallelism默认为处理器个数
int n = parallelism << 1; if (n >= MAX_ID) n = MAX_ID;//MAX_ID=0x7fff else { n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; } workers = new ForkJoinWorkerThread[n + 1];可见线程个数不会超过0x7fff。
添加线程
什么情况下需要添加线程呢?当新的任务到来,线程池会通知其他线程前去处理,如果这时没有处于等待的线程或者处于活动的线程非常少(这是通过ctl属性来判断的),就会往线程池中添加线程。private void addWorker() { Throwable ex = null; ForkJoinWorkerThread t = null; try { t = factory.newThread(this); } catch (Throwable e) { ex = e; } if (t == null) { // null or exceptional factory return long c; // adjust counts do {} while (!UNSAFE.compareAndSwapLong (this, ctlOffset, c = ctl, (((c - AC_UNIT) & AC_MASK) | ((c - TC_UNIT) & TC_MASK) | (c & ~(AC_MASK|TC_MASK))))); // Propagate exception if originating from an external caller if (!tryTerminate(false) && ex != null && !(Thread.currentThread() instanceof ForkJoinWorkerThread)) UNSAFE.throwException(ex); } else t.start(); }添加线程的代码比较简单,通过工厂类创建一个线程,通过调用ForkJoinWorkerThread的run方法启动这个线程。如果失败则恢复ctl以前的值,并终止线程。工厂类直接调用其构造方法,最终添加线程其实是在registerWorker方法完成的
for (int g;;) { ForkJoinWorkerThread[] ws; if (((g = scanGuard) & SG_UNIT) == 0 && UNSAFE.compareAndSwapInt(this, scanGuardOffset, g, g | SG_UNIT)) { int k = nextWorkerIndex; try { if ((ws = workers) != null) { // ignore on shutdown int n = ws.length; if (k < 0 || k >= n || ws[k] != null) { for (k = 0; k < n && ws[k] != null; ++k) ; if (k == n) ws = workers = Arrays.copyOf(ws, n << 1); } ws[k] = w; nextWorkerIndex = k + 1; int m = g & SMASK; g = (k > m) ? ((m << 1) + 1) & SMASK : g + (SG_UNIT<<1); } } finally { scanGuard = g; } return k; } else if ((ws = workers) != null) { // help release others for (ForkJoinWorkerThread u : ws) { if (u != null && u.queueBase != u.queueTop) { if (tryReleaseWaiter()) break; } } }
这里有个属性scanGuard有必要提一下,从Guard这个词可以知道它是在保护什么,是在保护works这个数组。当需要更新这个数组时,通过不断地检查scanGuard来达到保护的目的。
整个框架大量采用顺序锁,好处是不用阻塞,不好的地方是会有额外的循环。这里也是通过循环来注册这个线程,在循环的过程中有两种情况发生:1、compareAndSwapInt操作成功;2、操作失败。
第一种情况:扫描workers数组,找到一个为空的项,并把新创建的线程放在这个位置;如果没有找到,表示数组大小不够,则将数组扩大2倍;
第二种情况:需要循环重新尝试直接成功为止,从代码中可以看出,即使是失败了,也不忘做一些额外的事:通知其他线程去执行没有完成的任务。
执行任务
除了从ExecutorService继承过来的execute和submit方法外,还对这两个方法进行了覆盖和重载。public void execute(ForkJoinTask<?> task) { if (task == null) throw new NullPointerException(); forkOrSubmit(task); } public void execute(Runnable task) { if (task == null) throw new NullPointerException(); ForkJoinTask<?> job; if (task instanceof ForkJoinTask<?>) // avoid re-wrap job = (ForkJoinTask<?>) task; else job = ForkJoinTask.adapt(task, null); forkOrSubmit(job); }对参数为Runnable的execute进行了加强,将Runnable这种普通任务适配成ForkJoinTask这种任务,然后做为参数传给forkOrSubmit方法统一处理。
private <T> void forkOrSubmit(ForkJoinTask<T> task) { ForkJoinWorkerThread w; Thread t = Thread.currentThread(); if (shutdown) throw new RejectedExecutionException(); if ((t instanceof ForkJoinWorkerThread) && (w = (ForkJoinWorkerThread)t).pool == this) w.pushTask(task); else addSubmission(task); }从以上代码可以看出,这两种任务最终的归属还是不一样,ForkJoinTask这种任务被放到线程内部的队列里面,普通的Runnable任务被放到线程池的队列里面了。
除了通过调用execute方法外,对于ForkJoinTask任务通过调用fork方法也可以向自己所在的线程队列中添加一个任务。
终止线程池
和ExecutorService一样,可以调用shutdown()和 shutdownNow()来终止线程,会先设置每个线程的任务状态为CANCELLED,然后调用Thread的interrupt方法来终止每个线程。总结:ForkJoinPool就是一个ExcuteService,与ExcuteService不同的是:
1、ExcuteService执行Runnable/Callable任务,ForkJoinPool除了可以执行Runnable任务外,还可以执行ForkJoinTask任务;
2、ExcuteService中处于后面的任务需要等待前面任务执行后才有机会执行,而ForkJoinPool会采用work-stealing模式帮助其他线程执行任务,即ExcuteService解决的是并发问题,而ForkJoinPool解决的是并行问题。
下一节分析Fork/Join框架中的ForkJoinTask任务
相关文章推荐
- Fork/Join 框架 详解
- Java 理论与实践: 应用 fork-join 框架
- Java并发编程--Fork/Join框架使用
- Fork/Join-Java并行计算框架
- [转]Java7中的ForkJoin并发框架初探(中)——JDK中实现简要分析
- Java Thread&Concurrency(1): 深入理解Fork-Join并发执行框架
- Java7 ForkJoin 框架
- 并发编程之 Fork-Join 分而治之框架
- 并行编程之Fork/Join框架
- Java 7 Fork/Join 并行计算框架概览
- Java7中的ForkJoin并发框架初探(上)——需求背景和设计原理
- Fork/Join框架简介
- 我的Java开发学习之旅------>Java使用Fork/Join框架来并行执行任务
- [转]Java7中的ForkJoin并发框架初探(下)—— ForkJoin的应用
- Java7中的ForkJoin并发框架初探(上)——需求背景和设计原理
- ForkJoinPool框架示例
- 学习笔记之Java7中的ForkJoin并发框架初探(下)—— ForkJoin的应用
- Java 理论与实践: 应用 fork-join 框架
- Fork/Join框架
- thread_fork/join并发框架1