您的位置:首页 > 其它

ForkJoin 源码分析之ForkJoinPool

2016-04-04 20:55 337 查看
之前有学习过这个框架的使用,稍微回顾一下:

初始化一个ForkJoinPool

定义task或action

使用pool来执行task或action

这里初始化ForkJoinPool 采用如下方法:

public final static ForkJoinPool mainPool = new ForkJoinPool();


首先分析一下初始化做了些什么。

public ForkJoinPool() {
this(Runtime.getRuntime().availableProcessors(),
defaultForkJoinWorkerThreadFactory, null, false);
}


这里调用了另外一个constructor.其中defaultForkJoinWorkerThreadFactory 这个是什么呢?

public static final ForkJoinWorkerThreadFactory
defaultForkJoinWorkerThreadFactory;

static class DefaultForkJoinWorkerThreadFactory
implements ForkJoinWorkerThreadFactory {
public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
return new ForkJoinWorkerThread(pool);
}
}

public static interface ForkJoinWorkerThreadFactory {
/**
* Returns a new worker thread operating in the given pool.
*
* @param pool the pool this thread works in
* @throws NullPointerException if the pool is null
*/
public ForkJoinWorkerThread newThread(ForkJoinPool pool);
}


DefaultForkJoinWorkerThreadFactory 是一个静态内部类,其中返回了一个ForkJoinWorkerThread对象。ForkJoinWorkerThreadFactory 是一个interface。仅仅到这里的话,肯定会报错,因为defaultForkJoinWorkerThreadFactory 还没有初始化呢。继续查找代码,这个代码写的真孙子,ForkJoinPool 这个类最下面有一个static block。在这里初始化了一个defaultForkJoinWorkerThreadFactory .

defaultForkJoinWorkerThreadFactory =
new DefaultForkJoinWorkerThreadFactory();


到这里其实只是初始化了一个ForkJoinWorkerThread,还是要调用defaultForkJoinWorkerThreadFactory.newThread(ForkJoinPool pool)方法才行。

继续看被调用的constructor。在这个里面已经初始化了好多ForkJoinPool的字段了。

if (factory == null)
throw new NullPointerException();
if (parallelism <= 0 || parallelism > MAX_ID)
throw new IllegalArgumentException();
this.parallelism = parallelism;
this.factory = factory;
this.ueh = handler;
this.locallyFifo = asyncMode;
long np = (long)(-parallelism); // offset ctl counts
this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
this.submissionQueue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
// initialize workers array with room for 2*parallelism if possible
int n = parallelism << 1;
if (n >= MAX_ID)
n = MAX_ID;
else { // See Hackers Delight, sec 3.2, where n < (1 << 16)
n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8;
}
workers = new ForkJoinWorkerThread[n + 1];
this.submissionLock = new ReentrantLock();
this.termination = submissionLock.newCondition();
StringBuilder sb = new StringBuilder("ForkJoinPool-");
sb.append(poolNumberGenerator.incrementAndGet());
sb.append("-worker-");
this.workerNamePrefix = sb.toString();


挨个看看字段的解释吧:

/**
* The target parallelism level.
* 并行化级别,就是说可以有多少个线程在线程池中同时工作.
*/
final int parallelism;
/**
* Creation factory for worker threads.
* 线程池的工厂,刚才上面分析过了...
*/
private final ForkJoinWorkerThreadFactory factory;
/**
* The uncaught exception handler used when any worker abruptly
* terminates.
* 经过刚刚的初始化后,这个参数是null
*/
final Thread.UncaughtExceptionHandler ueh;
/**
* True if use local fifo, not default lifo, for local polling
* Read by, and replicated by ForkJoinWorkerThreads
* 还没搞明白目前...但是可以确定的是这个参数初始化以后是false
*/
final boolean locallyFifo;
// 这个参数太复杂...专题分析以后...
volatile long ctl;

/**
* Array serving as submission queue. Initialized upon construction.
* 提交任务的队列..
*/
private ForkJoinTask<?>[] submissionQueue;
/**
* Arry holding all work threads in the pool
* 其实就是线程池....
**/
ForkJoinWorkerThread[] workers;


上面那段构造函数的代码,我主要想看这里:

//initialize workers array with room for 2*parallelism if possible
int n = parallelism << 1;
if (n >= MAX_ID)
n = MAX_ID;
else { // See Hackers Delight, sec 3.2, where n < (1 << 16)
n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8;
}
workers = new ForkJoinWorkerThread[n + 1];


这个并行化级别首先左移了1位,其实就是乘2了。源代码中有注释啊…尽量是2倍的线程数量。但是线程再多,也不能多过这个数:

private static final int  MAX_ID     = 0x7fff;  // max poolIndex


到这里可以看出ForkJoinPool的初始化就干了2个事情:

初始化了一个线程池ForkJoinWorkerThread[parallelism..]

初始化了一个任务队列submissionQueue[8]。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  源码