ForkJoin 源码分析之ForkJoinPool
2016-04-04 20:55
337 查看
之前有学习过这个框架的使用,稍微回顾一下:
初始化一个ForkJoinPool
定义task或action
使用pool来执行task或action
这里初始化ForkJoinPool 采用如下方法:
首先分析一下初始化做了些什么。
这里调用了另外一个constructor.其中defaultForkJoinWorkerThreadFactory 这个是什么呢?
DefaultForkJoinWorkerThreadFactory 是一个静态内部类,其中返回了一个ForkJoinWorkerThread对象。ForkJoinWorkerThreadFactory 是一个interface。仅仅到这里的话,肯定会报错,因为defaultForkJoinWorkerThreadFactory 还没有初始化呢。继续查找代码,这个代码写的真孙子,ForkJoinPool 这个类最下面有一个static block。在这里初始化了一个defaultForkJoinWorkerThreadFactory .
到这里其实只是初始化了一个ForkJoinWorkerThread,还是要调用defaultForkJoinWorkerThreadFactory.newThread(ForkJoinPool pool)方法才行。
继续看被调用的constructor。在这个里面已经初始化了好多ForkJoinPool的字段了。
挨个看看字段的解释吧:
上面那段构造函数的代码,我主要想看这里:
这个并行化级别首先左移了1位,其实就是乘2了。源代码中有注释啊…尽量是2倍的线程数量。但是线程再多,也不能多过这个数:
到这里可以看出ForkJoinPool的初始化就干了2个事情:
初始化了一个线程池ForkJoinWorkerThread[parallelism..]
初始化了一个任务队列submissionQueue[8]。
初始化一个ForkJoinPool
定义task或action
使用pool来执行task或action
这里初始化ForkJoinPool 采用如下方法:
public final static ForkJoinPool mainPool = new ForkJoinPool();
首先分析一下初始化做了些什么。
public ForkJoinPool() { this(Runtime.getRuntime().availableProcessors(), defaultForkJoinWorkerThreadFactory, null, false); }
这里调用了另外一个constructor.其中defaultForkJoinWorkerThreadFactory 这个是什么呢?
public static final ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory; static class DefaultForkJoinWorkerThreadFactory implements ForkJoinWorkerThreadFactory { public ForkJoinWorkerThread newThread(ForkJoinPool pool) { return new ForkJoinWorkerThread(pool); } } public static interface ForkJoinWorkerThreadFactory { /** * Returns a new worker thread operating in the given pool. * * @param pool the pool this thread works in * @throws NullPointerException if the pool is null */ public ForkJoinWorkerThread newThread(ForkJoinPool pool); }
DefaultForkJoinWorkerThreadFactory 是一个静态内部类,其中返回了一个ForkJoinWorkerThread对象。ForkJoinWorkerThreadFactory 是一个interface。仅仅到这里的话,肯定会报错,因为defaultForkJoinWorkerThreadFactory 还没有初始化呢。继续查找代码,这个代码写的真孙子,ForkJoinPool 这个类最下面有一个static block。在这里初始化了一个defaultForkJoinWorkerThreadFactory .
defaultForkJoinWorkerThreadFactory = new DefaultForkJoinWorkerThreadFactory();
到这里其实只是初始化了一个ForkJoinWorkerThread,还是要调用defaultForkJoinWorkerThreadFactory.newThread(ForkJoinPool pool)方法才行。
继续看被调用的constructor。在这个里面已经初始化了好多ForkJoinPool的字段了。
if (factory == null) throw new NullPointerException(); if (parallelism <= 0 || parallelism > MAX_ID) throw new IllegalArgumentException(); this.parallelism = parallelism; this.factory = factory; this.ueh = handler; this.locallyFifo = asyncMode; long np = (long)(-parallelism); // offset ctl counts this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK); this.submissionQueue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY]; // initialize workers array with room for 2*parallelism if possible int n = parallelism << 1; if (n >= MAX_ID) n = MAX_ID; else { // See Hackers Delight, sec 3.2, where n < (1 << 16) n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; } workers = new ForkJoinWorkerThread[n + 1]; this.submissionLock = new ReentrantLock(); this.termination = submissionLock.newCondition(); StringBuilder sb = new StringBuilder("ForkJoinPool-"); sb.append(poolNumberGenerator.incrementAndGet()); sb.append("-worker-"); this.workerNamePrefix = sb.toString();
挨个看看字段的解释吧:
/** * The target parallelism level. * 并行化级别,就是说可以有多少个线程在线程池中同时工作. */ final int parallelism; /** * Creation factory for worker threads. * 线程池的工厂,刚才上面分析过了... */ private final ForkJoinWorkerThreadFactory factory; /** * The uncaught exception handler used when any worker abruptly * terminates. * 经过刚刚的初始化后,这个参数是null */ final Thread.UncaughtExceptionHandler ueh; /** * True if use local fifo, not default lifo, for local polling * Read by, and replicated by ForkJoinWorkerThreads * 还没搞明白目前...但是可以确定的是这个参数初始化以后是false */ final boolean locallyFifo; // 这个参数太复杂...专题分析以后... volatile long ctl; /** * Array serving as submission queue. Initialized upon construction. * 提交任务的队列.. */ private ForkJoinTask<?>[] submissionQueue; /** * Arry holding all work threads in the pool * 其实就是线程池.... **/ ForkJoinWorkerThread[] workers;
上面那段构造函数的代码,我主要想看这里:
//initialize workers array with room for 2*parallelism if possible int n = parallelism << 1; if (n >= MAX_ID) n = MAX_ID; else { // See Hackers Delight, sec 3.2, where n < (1 << 16) n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; } workers = new ForkJoinWorkerThread[n + 1];
这个并行化级别首先左移了1位,其实就是乘2了。源代码中有注释啊…尽量是2倍的线程数量。但是线程再多,也不能多过这个数:
private static final int MAX_ID = 0x7fff; // max poolIndex
到这里可以看出ForkJoinPool的初始化就干了2个事情:
初始化了一个线程池ForkJoinWorkerThread[parallelism..]
初始化了一个任务队列submissionQueue[8]。
相关文章推荐
- 从源码安装Mysql/Percona 5.5
- 浅析Ruby的源代码布局及其编程风格
- asp.net 抓取网页源码三种实现方法
- JS小游戏之仙剑翻牌源码详解
- JS小游戏之宇宙战机源码详解
- jQuery源码分析之jQuery中的循环技巧详解
- 本人自用的global.js库源码分享
- java中原码、反码与补码的问题分析
- ASP.NET使用HttpWebRequest读取远程网页源代码
- PHP网页游戏学习之Xnova(ogame)源码解读(六)
- C#获取网页HTML源码实例
- PHP网页游戏学习之Xnova(ogame)源码解读(八)
- PHP网页游戏学习之Xnova(ogame)源码解读(四)
- JS小游戏之极速快跑源码详解
- JS小游戏之象棋暗棋源码详解
- android源码探索之定制android关机界面的方法
- 基于Android设计模式之--SDK源码之策略模式的详解
- Android游戏源码分享之2048
- C语言借助EasyX实现的生命游戏源码
- C实现的非阻塞方式命令行端口扫描器源码