您的位置:首页 > 编程语言 > Java开发

java 中的Fork/Join框架

2016-07-03 12:04 375 查看

什么是Fork/Join框架

Fork/Join框架是一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。

我们再通过Fork和Join这两个单词来理解下Fork/Join框架,Fork就是把一个大任务切分为若干子任务并行的执行,Join就是合并这些子任务的执行结果,最后得到这个大任务的结果。比如计算1+2+。。+10000,可以分割成10个子任务,每个子任务分别对1000个数进行求和,最终汇总这10个子任务的结果。Fork/Join的运行流程图如下:



涉及到的类主要有:

ForkJoinPool
:线程池,实现抽象类
AbstractExecutorService
(实现了
ExecutorService


负责维护全部的工作线程

接收调用者分配的task

本身持有一个全局的task队列

实现任务窃取

ForkJoinWorkerThread
:ForkJoinPool线程池中的worker线程,具体执行task。其中保存着对所在线程池的引用。

ForkJoinTask
接口。task的抽象。

RecursiveTask
:task执行完成后带返回值的task。

RecursiveAction
:不带返回值的task。

ForkJoin框架能满足的需求

如果一个任务的问题集能被拆分,并且组合多个子任务的结果就能获取结果,那么这个问题就适合使用ForkJoin框架解决问题。例如:从数组中查找最大数,划分为查找局部最大数;

工作窃取

ForkJoin核心点:工作窃取



工作窃取使得较空闲的线程可以帮助繁忙线程,而不是在空闲等待状态,让整个系统更快的解决问题集合。特别是每个线程处理的问题子集的大小是无法预估的情况下(这种情况下可能出现有些线程很繁忙,而有些比较空闲,在等待其它子任务完成才能算出最终结果。)

每个工作线程都有自己的工作队列,这是使用双端队列(或者叫做 deque)来实现的(Java 6 在类库中添加了几种 deque 实现,包括
ArrayDeque
LinkedBlockingDeque
)。

标准队列和双端队列实现工作窃取对比:
可以使用标准队列实现工作窃取,但是与标准队列相比,deque 具有两方面的优势:减少争用和窃取。因为只有工作线程会访问自身的 deque 的头部,deque 头部永远不会发生争用;因为只有当一个线程空闲时才会访问 deque 的尾部,所以也很少存在线程的 deque 尾部的争用(在 fork-join 框架中结合 deque 实现会使这些访问模式进一步减少协调成本)。

跟传统的基于线程池的方法相比,减少争用会大大降低同步成本。此外,这种方法暗含的后进先出(last-in-first-out,LIFO)任务排队机制意味着最大的任务排在队列的尾部,当另一个线程需要窃取任务时,它将得到一个大任务(能够分解成多个小任务的任务),从而避免了在未来窃取任务。因此,工作窃取实现了合理的负载平衡,无需进行协调并且将同步成本降到了最小。

工作窃取算法的优点是充分利用线程进行并行计算,并减少了线程间的竞争,其缺点是在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且消耗了更多的系统资源,比如创建多个线程和多个双端队列。

关键属性

ForkJoinTask<?>[] submissionQueue; // java.util.concurrent.ForkJoinPool
// Pool的task队列 初始容量为8192 由于submissionQueue是环形队列 而作者使用了特殊的求余算法 导致的容量必须为2的幂次方
// 通过`java.util.concurrent.ForkJoinPool#growSubmissionQueue()`方法拓展 调用者线程submit的task都会提交到这个队列
// 然后唤醒worker去该队列steal task 如果在worker线程调用submit提交直接提交调用提交方法的worker线程的task队列中

ForkJoinTask<?>[] queue; // java.util.concurrent.ForkJoinWorkerThread
// Worker的task队列 初始容量为8192 基本上与submissionQueue的维护方式一样 不过只能通过在worker线程调用fork()才能将
// task添加到这个队列中

volatile int queueBase; // java.util.concurrent.ForkJoinPool & java.util.concurrent.ForkJoinWorkerThread
// 队列尾部索引 task窃取时会更改此值
//由于几个线程可能同时访问 所以修饰符是volatile

int queueTop; // java.util.concurrent.ForkJoinPool & java.util.concurrent.ForkJoinWorkerThread
// task push与pop时会更改的索引 根据上面对队列属性的描述会发觉所有入队操作都是由"一个线程"来完成的(调用者线程往pool中
// push task 而worker线程自己给自己fork task) 所以其是非线程安全的 另 submissionLock 保证了向pool中提交task的安全性
// 但是这个保护只是防止调用者作死而存在的(比如并发往pool中提交task) 如果保证调用者单线程入数据 则不需要这个锁

volatile int status; // java.util.concurrent.ForkJoinTask
// task执行的状态 对应本类的四个状态常量 已完成(NORMAL),被取消(CANCELLED),信号(SIGNAL)和出现异常(EXCEPTIONAL)

框架入口

提交任务

由于ForkJoinPool实现了ExecutorService,也就是支持通过
submit
execute
两种方法提交任务。

submit
execute
方法中,可能需要使用
ForkJoinTask.adapt()
方法将
Runnable
Callable
方法包装成
ForkJoinTask
类型的Job。
核型提交由
forkOrSubmit
完成。

源码:

private <T> void forkOrSubmit(ForkJoinTask<T> task) {
ForkJoinWorkerThread w;
Thread t = Thread.currentThread();
if (shutdown)
throw new RejectedExecutionException();
// 已经被shutdown后则不再接收新的task 与传统线程池不同的是task最大数量是由ForkJoin自行管理的 外部不可更改
// 环形队列把这点限制死了 不过也不错
if ((t instanceof ForkJoinWorkerThread) &&
(w = (ForkJoinWorkerThread)t).pool == this)
w.pushTask(task);
// 如果提交task的线程是worker线程并且属于当前的pool 则直接将task添加到这个worker中
//(即:调用着直接将task提交到具体的worker线程,而不是提交给线程池)
// 这个方法由于面向具体的线程 所以不需要锁
else
addSubmission(task);
// 否则将task添加到pool队列中
}

如果当前线程是
ForkJoinWorkerThread
类型的,则将任务追加到
ForkJoinWorkerThread
对象中维护的队列上,否则将新的任务放入
ForkJoinPool
的提交队列中,并通知线程工作。

pushTask方法实现原理在下面谈ForkJoinTask的fork方法实现原理时一起说。

简化提交方式

public <T> T invoke(ForkJoinTask<T> task) {
Thread t = Thread.currentThread();
if (task == null)
throw new NullPointerException();
if (shutdown)
throw new RejectedExecutionException();
if ((t instanceof ForkJoinWorkerThread) &&
((ForkJoinWorkerThread)t).pool == this)
return task.invoke();  // 如果提交task的线程对象是当前pool中的worker 则直接让当前worker自己处理task
else {
addSubmission(task);   // 所有非worker提交的task全部由pool保存
return task.join();  //等待task执行完毕
}
}

调用者线程流程

即调用了提交接口的线程,在task提交后的主要流程。
入口处不管是submit,execute方法提交task,还是使用invoke直接提交task,如果是提交给线程池,那么都会进入
addSubmission
方法。

private void addSubmission(ForkJoinTask<?> t) {
final ReentrantLock lock = this.submissionLock;
// 这里这个锁是防止调用者乱搞 task的生成大多都在worker中发生
//并且不会使用ForkJoinPool.addSubmission方法来实现生成task, 所以调用这个方法的只有调用者
lock.lock();
try {
ForkJoinTask<?>[] q; int s, m;
if ((q = submissionQueue) != null) {
long u = (((s = queueTop) & (m = q.length-1)) << ASHIFT)+ABASE;
// 获取top对应的数组位置(内存地址)
UNSAFE.putOrderedObject(q, u, t); // 将新的task添加到队列中
queueTop = s + 1;//更新队列头的位置
if (s - queueBase == m)
growSubmissionQueue(); // 在添加之后检查队列长度是否到极限 如果是则扩容
}
} finally {
lock.unlock();
}
signalWork(); // 唤醒工作线程
}

方法结尾处会唤醒(或创建) worker线程 (创建用addWorker()) 而worker线程被创建之后 就会不断的调用scan方法去窃取其他worker或pool中的task 直到全部task结束 这里需要特别说明的地方有2点:

(s = queueTop) & (m = q.length-1)
,相当于
queueTop % (q.length - 1)
,返回的是队列下标(queueTop或queueBase)在环形队列数组中的真实位置,即数组中的Index。

(((s = queueTop) & (m = q.length-1)) << ASHIFT)+ABASE;
通过直接访问指定内存地址来替换和获取元素。

查看ASHIFT和ABASE的来源:

static {
int s;
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class a = ForkJoinTask[].class;
ABASE = UNSAFE.arrayBaseOffset(a);
s = UNSAFE.arrayIndexScale(a);
} catch (Exception e) {
throw new Error(e);
}
if ((s & (s-1)) != 0)
throw new Error("data type scale not a power of two");
ASHIFT = 31 - Integer.numberOfLeadingZeros(s);
}


此处的特殊的求余算法,就是为什么需要submissionQueue是2的整数幂次方 的原因!!!!
两个2的整数次幂方(x%y,其中x必须小余y) 余方法:
x&(y-1)


Java数组在实际存储时有一个对象头,后面才是实际的数组数据,而
UNSAFE.arrayBaseOffset
就是用来获取实际数组数据的偏移量
UNSAFE.arrayIndexScale
则是获取对应数组元素占的字节数。这里的代码ABASE=16(数组对象头大小),s=4(ForkJoinTask对象引用占用字节数),ASIFT=2。

所以上面的Index << ASHIFT + ABASE合起来就是Index左移2位=Index*4,也就是算Index的在数组中的偏移量,再加上ABASE就是Index在对象中的偏移量。也就是那一行代码主要就是算出来queueTop在队列数组中的实际偏移量。

nativa函数:
UNSAFE.putOrderedObject(q, u, t);

能够保证写写不会被重排序,但是不保证写会对其它线程可见;而volatile变量既保证写写不会被重排序,也保证写后对其它线程立即可见。可见Unsafe.putOrderedObject会比直接的volatile变量赋值速度会一点,这篇文章(需要翻墙)则指出Unsafe.putOrderedObject会比volatile写快3倍。

为什么要保证写不会重排序?线程安全性由subbmissionLock保证
因为只有保证了写不重排序,才能使用上面基于偏移的方式寻找queue中的元素地址。
那么为什么需要基于偏移的方式需找地址?这需要对比不使用上述方式插入task到submissionQueue。
如果自己实现,那么先根据queueTop找到下一个数组index,然后在数组中放入task。
总体来说,就是为了高效的插入task到数组中。[感觉理解不到位啊~,还有别的原因?]

从addSubmission源码中不难发现,addSubmission的核心是:
growSubmissionQueue
signalWork


growSubmissionQueue

growSubmissionQueue主要是完成扩容功能(当容量为0或者对象为null,则创建)。

/**
* Creates or doubles submissionQueue array.
* Basically identical to ForkJoinWorkerThread version.
*/
/**
* Creates or doubles submissionQueue array.
* Basically identical to ForkJoinWorkerThread version.
*/
private void growSubmissionQueue() {
ForkJoinTask[] oldQ = submissionQueue;
// 为null则初始化size,否则容量翻倍。
int size = oldQ != null ? oldQ.length << 1 : INITIAL_QUEUE_CAPACITY;
if (size > MAXIMUM_QUEUE_CAPACITY)
throw new RejectedExecutionException("Queue capacity exceeded");
if (size < INITIAL_QUEUE_CAPACITY)
size = INITIAL_QUEUE_CAPACITY;

ForkJoinTask<?>[] q = submissionQueue = new ForkJoinTask<?>[size];
int mask = size - 1;
int top = queueTop;
int oldMask;
if (oldQ != null && (oldMask = oldQ.length - 1) >= 0) {
// 如果旧队列中有数据 则将其数据填充到新的队列数组中
for (int b = queueBase; b != top; ++b) {
long u = ((b & oldMask) << ASHIFT) + ABASE; // 根据旧的mask获取元素在旧队列中的位置
Object x = UNSAFE.getObjectVolatile(oldQ, u); // 获取元素

// 获取后判断旧的队列中是否存在此元素 如果不存在则取消将其加入新的队列 因为在两个步骤之间
// task可能已经被执行过了 (其他线程或许会持有旧队列数组的引用) 反之 如果存在 则加入到新的队列中
if (x != null && UNSAFE.compareAndSwapObject(oldQ, u, x, null))
UNSAFE.putObjectVolatile
(q, ((b & mask) << ASHIFT) + ABASE, x);
}
}
}

signalWork 唤醒线程

while为ture 的条件:(worker总数量少 | 有至少一个等待中)and (active状态的worker太少 | 线程池正在结束)。

e
是线程池控制字段,意义:

>0
:释放一个waiter,唤醒线程

=0
:没有等待创建的worker

<0
:线程池正在关闭

创建worker主要是
addWorker
完成。

/**
* Wakes up or creates a worker.
*/
final void signalWork() {
long c; int e, u;
while ((((e = (int)(c = ctl)) | (u = (int)(c >>> 32))) &
(INT_SIGN|SHORT_SIGN)) == (INT_SIGN|SHORT_SIGN) && e >= 0) {
if (e > 0) {                         // release a waiting worker
int i; ForkJoinWorkerThread w; ForkJoinWorkerThread[] ws;
if ((ws = workers) == null ||
(i = ~e & SMASK) >= ws.length ||
(w = ws[i]) == null)
break;
long nc = (((long)(w.nextWait & E_MASK)) |
((long)(u + UAC_UNIT) << 32));
if (w.eventCount == e &&
UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) {
w.eventCount = (e + EC_UNIT) & E_MASK;
if (w.parked)
UNSAFE.unpark(w);
break;
}
}
else if (UNSAFE.compareAndSwapLong
(this, ctlOffset, c,
(long)(((u + UTC_UNIT) & UTC_MASK) |
((u + UAC_UNIT) & UAC_MASK)) << 32)) {
addWorker();
break;
}
}
}

addWorker主要使用ForkJoinWorkerThreadFactory生成worker线程。
源码如下:

private void addWorker() {
Throwable ex = null;
ForkJoinWorkerThread t = null;
try {
t = factory.newThread(this);
// 创建worker线程 下面的代码不解释了 只是根据发生异常的情况来决定是否告知调用者
} catch (Throwable e) {
ex = e;
}
if (t == null) {  // null or exceptional factory return
long c;       // adjust counts
do {} while (!UNSAFE.compareAndSwapLong
(this, ctlOffset, c = ctl,
(((c - AC_UNIT) & AC_MASK) |
((c - TC_UNIT) & TC_MASK) |
(c & ~(AC_MASK|TC_MASK)))));
// Propagate exception if originating from an external caller
if (!tryTerminate(false) && ex != null &&
!(Thread.currentThread() instanceof ForkJoinWorkerThread))
UNSAFE.throwException(ex);
}
else
t.start(); // 启动线程
}

worker线程流程

创建完线程后,线程直接启动了。Thread的
start()
会调用run()。

public void run() {
Throwable exception = null;
try {
onStart(); // 首先初始化当前worker线程
pool.work(this); // 调用pool将自己注册到pool中并表示自己可以开始工作
} catch (Throwable ex) {
exception = ex;
} finally {
onTermination(exception);
// 这个方法主要是将当前线程置为终结状态 在work方法种可以看到线程在获取task的时候是根据这个状态轮询的
// 一旦设置为false 就不再接收其他的task 另外也记录了这个线程发生的异常
}
}

初始化工作,主要初始化窃取目标:

protected void onStart() {
queue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY]; // 初始化队列
int r = pool.workerSeedGenerator.nextInt(); // 初始化工作窃取的时候,要窃取的线程对象
seed = (r == 0) ? 1 : r; //  must be nonzero
}

每个worker线程基本上在调用到这个
work()
方法后,就会一直循环,重复着 工作窃取、执行线程本身队列task的过程除非在执行task过程中发生异常或者pool被shutdown 或者按需调整工作线程总数,导致该线程被回收。

final void work(ForkJoinWorkerThread w) {
boolean swept = false;                // true on empty scans
long c;

while (!w.terminate && (int)(c = ctl) >= 0) {
// 这个terminate就是上面onTermination(exeception)方法设置的状态位
int a;                            // active count
if (!swept && (a = (int)(c >> AC_SHIFT)) <= 0)
swept = scan(w, a); // 扫描
else if (tryAwaitWork(w, c))
swept = false;
}
}

scan()
如果返回false,则会让
work()
方法中的循环继续调用scan;返回true则会调用
tryAwaitWork()
也就是等待task。迟早还会调用这个方法。

扫描发现其它任务,
scan方法输入参数

w
:当前worker。

a
:active状态的worker数量。

scan方法:

private boolean scan(ForkJoinWorkerThread w, int a) {

int g = scanGuard; // mask 0 avoids useless scans if only one active
int m = (parallelism == 1 - a && blockedCount == 0) ? 0 : g & SMASK;
ForkJoinWorkerThread[] ws = workers;
if (ws == null || ws.length <= m)         // staleness check
return false;           // 安全检查
for (int r = w.seed, k = r, j = -(m + m); j <= m + m; ++j) {
ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
ForkJoinWorkerThread v = ws[k & m];
// 以下部分是task窃取的一部分 新启动的worker线程会通过work方法调用到scan方法并且传递自己的对象(也就是w)
// 到方法中 在下面的代码中他会尝试从现存的worker的队列中窃取一个task ForkJoin框架是在运行的期间不断的分裂
// 在ForkJoinTask的实现类里调用fork()就可能会创建新的worker 并走到这个分支 所以 在worker数不足的情况下
// 窃取的几率很高 但是当worker数稳定后 每个worker会给自己分配task 而不是再这样窃取其他线程的task

if (v != null && (b = v.queueBase) != v.queueTop &&
(q = v.queue) != null && (i = (q.length - 1) & b) >= 0) {
long u = (i << ASHIFT) + ABASE;
if ((t = q[i]) != null && v.queueBase == b &&
UNSAFE.compareAndSwapObject(q, u, t, null)) {
// compareAndSwapObject将被窃取的位置置空,q的u位置上放入t,t原来的位置放入null

int d = (v.queueBase = b + 1) - v.queueTop; // 看 他窃取了 他窃取了!
v.stealHint = w.poolIndex;
if (d != 0)
signalWork();// d!=0说明有多个task没有完成,继续创建worker去窃取task
w.execTask(t);
}

r ^= r << 13; r ^= r >>> 17; w.seed = r ^ (r << 5);
return false;                     // store next seed
}
else if (j < 0) {                     // xorshift
r ^= r << 13; r ^= r >>> 17; k = r ^= r << 5;
}
else
++k;
}
if (scanGuard != g)                       // staleness check
return false;
else {                                    // try to take submission
// 第一个task并不是直接被分配到worker的线程里(因为创建task的并不是worker本身) 而是直接进入pool的队列中
// 然后调用者线程会主动创建一个新的worker 在上面的逻辑(说实话我看不懂上边的逻辑) 中无法从其他worker中
// 窃取到task的时候 或者是其他worker分配的task已经执行完毕后 再从pool的队列中获取task

ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
if ((b = queueBase) != queueTop &&
(q = submissionQueue) != null &&
(i = (q.length - 1) & b) >= 0) {
long u = (i << ASHIFT) + ABASE;
if ((t = q[i]) != null && queueBase == b &&
UNSAFE.compareAndSwapObject(q, u, t, null)) {
queueBase = b + 1;
w.execTask(t);
}
return false;
}

return true;                         // all queues empty
}
// 无论走了哪个分支 最终都会调用 w.execTask(t); (没task的情况下除外)
}

最后都会进入
executeTask


final void execTask(ForkJoinTask<?> t) {
currentSteal = t; // 当前窃取到的task 在join的时候有用
for (;;) {
if (t != null)
t.doExec();
// 执行具体的task 通常在编写forkjoin的时候 都会在运行期间分裂出其他task
if (queueTop == queueBase) // 判断自己的队列task是否已经全部完成 如果是则退出方法
// 这里需要特殊说明的是 只有当前线程会给自己的队列添加task 也就是说 当前线程如果不再fork task
// queueTop就不会发生变化 所以这个方法的判断是安全的
break;
t = locallyFifo ? locallyDeqTask() : popTask(); // 弹出task
// 这里唯一需要注意的是 ForkJoin支持FIFO(在外面设置) 如果设置了FIFO 就会跟其它"steal task"的线程一起
// 从queueBase开始获取task 顺带一提 locallyDeqTask有两个版本 一个是针对其他线程的steal task实现
// 另外一个是当前线程的实现
}
++stealCount;
currentSteal = null;
}

doExec()是执行task的基本方法,调用
JoinForkTask.exe()
实现具体的执行:

final void doExec() {
if (status >= 0) {
boolean completed;
try {
completed = exec();
} catch (Throwable rex) {
setExceptionalCompletion(rex);
return;
}
if (completed)
setCompletion(NORMAL); // must be outside try block
}
}

具体task是怎么执行的得看
JoinForkTask
的具体实现。在jdk中的默认抽象实现类的实现如下:

protected final boolean exec() {
result = compute();
return true;
}

调用了抽象的compute方法。该方法是一般使用jdk的ForkJoin框架的程序实现。

小结:
先尝试做任务窃取( Work Stealing ),如果不满足条件则到提交队列中获取任务。而ForkJoinWorkerThread线程本身也维护了线程内fork和join任务操作得到的队列,结合起来,总体执行任务的顺序就是:

线程会先执行ForkJoinWorkerThread对象内维护的任务队列中的任务,即
ForkJoinWorkerThread.execTask()
方法中的循环实现。通常是LIFO,即去最新的任务。也有特殊情况,这个根据变量locallyFifo的值来判断。

之后会尝试做任务窃取,尝试从其他线程中获取任务任务窃取条件不满足时,到提交队列中获取提交的任务

Task流程

最核心的方法是
fork
join


fork方法

ForkJoinTask的fork方法实现原理。当我们调用ForkJoinTask的fork方法时,程序会调用ForkJoinWorkerThread的
pushTask
方法异步的执行这个任务,然后立即返回结果。
代码如下:

public final ForkJoinTask<V> fork() {
((ForkJoinWorkerThread) Thread.currentThread())
.pushTask(this);
return this;
}

pushTask方法:

final void pushTask(ForkJoinTask t) {
ForkJoinTask[] q; int s, m;
if ((q = queue) != null) { // ignore if queue removed
long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE; UNSAFE.putOrderedObject(q, u, t);
// 将task放入当前线程的队列中
queueTop = s + 1; // or use putOrderedInt
if ((s -= queueBase) <= 2)
pool.signalWork(); // 如果队列中的未处理task小于2 则唤醒新的worker
// task分解会一分为二,此时如果task <= 2说明可以尝试继续分解,唤醒的线程继续分解task。
else if (s == m)
growQueue(); // 扩容
}
}

worker获取task

popTask: (当前worker从自己的队列获取任务时,)从队头获取task。

private ForkJoinTask<?> popTask() {
int m;
ForkJoinTask<?>[] q = queue;
if (q != null && (m = q.length - 1) >= 0) {
for (int s; (s = queueTop) != queueBase;) { // 轮循自己的队列 获取没有被窃取的task 是出栈的过程
// 通过工作窃取的索引和 当前worker自己弹出、压入队列的索引 来判断是否有剩余元素
int i = m & --s;
long u = (i << ASHIFT) + ABASE; // raw offset
ForkJoinTask<?> t = q[i];
if (t == null)
// 获取task的时候如果task已经为空 则被其他线程窃取掉 此时break 将剩下的判断操作委托给外围的execTask处理
break;
if (UNSAFE.compareAndSwapObject(q, u, t, null)) { // t位置 置为null 防止steal task的线程重复执行task
queueTop = s; // or putOrderedInt // 更新top
return t;
}
}
}
return null;
}

polltask:获取本地或者移除本地task | 窃取task

final ForkJoinTask<?> pollTask() {
ForkJoinWorkerThread[] ws;
ForkJoinTask<?> t = pollLocalTask(); // 尝试从自己的队列中获取task
if (t != null || (ws = pool.workers) == null)
return t;
int n = ws.length; // cheap version of FJP.scan
int steps = n << 1; // 这里限制了尝试从其他workersteal task的次数 看起来貌似是worker容器长度的2倍
int r = nextSeed();
int i = 0;
while (i < steps) {// 从其他的worker的task队列中steal task
ForkJoinWorkerThread w = ws[(i++ + r) & (n - 1)];

// 先判断具体的worker队列中是否存在task 不存在则换到其他的worker
if (w != null && w.queueBase != w.queueTop && w.queue != null) {
// 如果窃取到task则返回 否则重置steps
if ((t = w.deqTask()) != null)
return t;
i = 0;
}
}
return null;
}

join方法与结果获取

join 方法:

public final V join() {
if (doJoin() != NORMAL)
// doJoin就是调用子类实现的exec方法然后根据运行状况 设置不同的状态位
// (比如发生异常设置一个状态 比如task取消是另外一个状态..)
return reportResult();
else
return getRawResult(); // 状态为NORMAL 的时候说明执行完成 直接返回结果即可
}

它调用了
doJoin()
方法,通过doJoin()方法得到当前任务的状态来判断任务完成情况,进而判断返回什么结果。
任务状态有四种:已完成(NORMAL),被取消(CANCELLED),信号(SIGNAL)和出现异常(EXCEPTIONAL)。

如果任务状态是已完成,则直接返回任务结果。

如果任务状态是被取消,则直接抛出
CancellationException


如果任务状态是抛出异常,则直接抛出对应的异常。

private int doJoin() {
Thread t; ForkJoinWorkerThread w; int s; boolean completed;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
// 如果调用join方法的线程为worker 则尝试让worker本身执行
if ((s = status) < 0) // 判断task是否已经得到结果 得到结果则直接返回
return s;
if ((w = (ForkJoinWorkerThread)t).unpushTask(this)) {
// unpushTask会检查queueTop位置上是不是当前task 如果是则直接执行
try {
completed = exec(); // 这里的流程基本上跟doExec()一样
} catch (Throwable rex) {
return setExceptionalCompletion(rex);
}
if (completed)
return setCompletion(NORMAL);
}
return w.joinTask(this); // 调用当前worker线程的joinTask等待其安排task执行
}
else
return externalAwaitDone(); // 否则object wait
}

joinTask如下:

final int joinTask(ForkJoinTask<?> joinMe) {
// 这个替换有点类似方法调用 假设worker在执行task的时候执行了join() 被join的task同样也调用了join() 所以 像个栈..
ForkJoinTask<?> prevJoin = currentJoin;
currentJoin = joinMe;
for (int s, retries = MAX_HELP;;) {
if ((s = joinMe.status) < 0) {
currentJoin = prevJoin;
// 所以 当前task的join一旦有了结果 则将currentJoin替换回之前的task
return s;
}
if (retries > 0) {
if (queueTop != queueBase) {
if (!localHelpJoinTask(joinMe))
retries = 0;           // cannot help
// 检查当前queueTop位置上的task是否是被join的task 如果不是 并且task已经完成 则直接将retries置为0
// 进入下一轮循环 如果queueTop位置就是当前task 则执行 返回true后会重新进入for循环并返回执行结果
}
else if (retries == MAX_HELP >>> 1) {
--retries;                 // check uncommon case
if (tryDeqAndExec(joinMe) >= 0)
// 当前task队列中已经没有task 明显要被join的task已经被窃取 所以去其他worker的将task窃取回来并执行
Thread.yield();        // for politeness
}
else
retries = helpJoinTask(joinMe) ? MAX_HELP : retries - 1;
// 帮助其他的worker完成task 走到这个分支只能去steal task了
// (steal task的代码已经示范过多次 这里就不示范了..)
}
else {
retries = MAX_HELP;           // restart if not done
pool.tryAwaitJoin(joinMe);
}
}
// 这里需要注意的是 如果当前worker的task没有执行完毕 绝对不会去其他worker steal task
// 其次 每次循环的第一个判断是判断调用join()的task的状态是否为已完成 也就是说在这期间如果task已经被处理完了
// 则直接退出方法 不再尝试steal task 而这个task可能是由子task执行的 也可能是当前线程自己执行的
//(即完成了的task在join的时候是直接退出的)
}

总结整体流程

客户端程序实现
ForkJoinTask
,主要是实现
compute
方法。

使用
ForkJoinPool
提交任务。

首次需addSubmission,将task添加到
submissionQueue
。创建Worker进程,并启动。

worker线程使用scan获取队列中task,并最终调用
execTask
执行task。

execTask
会使用
popTask
将初始task弹出,然后调用
doExec
方法执行。

执行会最终调用自己实现的
compute
方法。

在compute方法会使用
fork
方法实现初始任务划分,将任务划分成小任务。

每个fork的任务会将自己放入当前线程的队列中,其实此时刚完成初始化分,两task在一个worker中,所以在同一个队列中。

由于队列的queue中,task数量小于2会执行
pool.singleWorker
创建/唤醒新的worker

此时前一个worker fork完成,在work方法的while循环中,继续执行任务(即步骤4中,在execTask会将本worker线程中所有task挨个doExec。);后一个worker窃取前一个worker的任务,并执行。

fork完成后,此时任务分工完成,达到预期细粒度。进入join。

进入join后,查询任务执行的情况,如果是NORMAL状态的,就调用
doJoin
,获取返回值,并且使用调用
TaskJoin
将task与其它task合并(可能会出现工作窃取,并继续执行)。

ForkJoin框架反复递归执行自定义的compute方法,每次调用都会fork,不断划分task,就像一个二叉树不断往下划分子树,最终到达叶节点。然后打到叶节点的开始回溯,使用join合并结果。和最开始的图一样。

划分和合并规则

当一个任务划分一个新线程时,它将自己推到 deque 的头部。

当一个任务执行与另一个未完成任务的合并操作时,它会将另一个任务推到队列头部并执行,而不会休眠以等待另一任务完成(像 Thread.join() 的操作一样)。

当线程的任务队列为空,它将尝试从另一个线程的 deque 的尾部 窃取另一个任务。

参考

http://onlychoice.github.io/blog/2013/09/17/java-concurrent-source-code-reading-3/

jdk帮助文档
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: