Java Concurrency代码实例之一执行者与线程池
2017-05-04 22:34
405 查看
本文的读者应该是已经掌握了基本的Java多线程开发技巧,但不熟悉Java Concurrency包的程序员。
Concurrency包中提供的工具众多,用法各异,熟练掌握后大有裨益。本文对包中的各种工具类从概念和用法上进行了分类,并给出了具体的样例代码。
1. 执行者与线程池
2. 并发队列
3. 同步工具
4. 并发集合
5. 锁
6. 原子变量
本文介绍的就是第一类,执行者与线程池。
而Concurrency包出现之后,线程的创建还是依靠Thread、Runnable和Callable(新加入)对象的实例化;而线程的执行则靠Executor、ExecutorService的对象执行execute()方法或submit()方法;线程的调度则被固化为几个具体的线程池类,如ThreadPoolExecutor、ScheduledThreadPoolExecutor、ExecutorCompletionService等等。这样表面上增加了复杂度,而实际上成功将线程的创建、执行和调度的业务逻辑分离,使程序员能够将精力集中在线程中业务逻辑的编写,大大提高了编码效率,降低了出错的概率,而且大大提高了性能。
Executor:执行者接口,所有执行者的父类;
ExecutorService:执行者服务接口,具体的执行者类都继承自此接口;
Executors:执行者工具类,大部分执行者的实例以及线程池都由它的工厂方法创建。
先看一个例子:
Executors工具类的newCachedThreadPool方法创建了一个执行者的对象,该对象是ExecutorService的实现类(后面我们会知道它是一个ThreadPoolExecutor对象)。Task是Runnable接口的实现类,代码中new出了三个Task对象,它们被提交给ExecutorService的execute方法作为参数,即三个线程被ExecutorService执行了,其调度机制隐含在执行者的具体类中。最后ExecutorService调用shutdown方法,阻止继续提交其他线程,并等待执行中的线程结束。
通过这种方式,线程的创建、执行和调度被分离了,程序员可以将精力集中在线程的内部业务中。
在Java Concurrency中,得到异步结果有了一套固定的机制,即通过Callable接口、Future接口和ExecutorService的submit方法来得到异步执行的结果,相关工具的介绍如下:
Callable:泛型接口,与Runnable接口类似,它的实例可以被另一个线程执行,内部有一个call方法,返回一个泛型变量V;
Future:泛型接口,代表依次异步执行的结果值,调用其get方法可以得到一次异步执行的结果,如果运算未完成,则阻塞直到完成;调用其cancel方法可以取消一次异步执行;
CompletionService:一种执行者,可将submit的多个任务的结果按照完成的先后顺序存入一个内部队列,然后可以使用take方法从队列中依次取出结果并移除,如果调用take时计算未完成则会阻塞。
先看一个简单的例子,得到一个异步执行的整形值:
接下来是一个使用CompletionService的例子,创建5个线程计算一些值,执行完成后使用CompletionService依次取出结果并打印:
需要注意的是CompletionService的创建方法,它的构造函数需要一个ExecutorService的对象作为参数。
ScheduledExecutorService:另一种执行者,可以将提交的任务延期执行,也可以将提交的任务反复执行。
ScheduledFuture:与Future接口类似,代表一个被调度执行的异步任务的返回值。
下面的例子中ScheduledExecutorService的实例scheduler调度了两个任务,第一个任务使用scheduleAtFixedRate()方法每隔一秒重复打印“beep”,第二个任务使用schedule()方法在10秒后延迟执行,它的作用是取消第一个任务,代码如下:
还有,使用最多的,替代Thread.sleep的这种用法:
原来Java的时间单位默认为毫秒,引入TimeUnit后增加了纳秒和微秒,精度更高了,同时引入了很多转换方法,便于使用。
在JDK1.8中,按照线程池的创建方法来看,应该有五种线程池,它们的创建方法如下所示:
而进一步查看源代码,这些方法最终都调用了ThreadPoolExecutor和ScheduledExecutorService的构造函数。而ScheduledExecutorService继承自ThreadPoolExecutor,因此最终所有线程池的构造函数都调用了ThreadPoolExecutor的如下构造函数:
请大家注意这五个参数(重点):
第一个参数corePoolSize,指明了线程池中应该保持的线程数量,即使线程处于空闲状态,除非设置了allowCoreThreadTimeOut这个参数;
第二个参数maximumPoolSize,线程池中允许的最大线程数,线程池中的线程一旦到达这个数,后续任务就会等待池中的线程空闲,而不会去创建新的线程了;
第三个参数keepAliveTime,当池中线程数量大于corePoolSize时,多出的线程在空闲时能够生存的最大时间,若一个线程空闲超过这个时间,它就会被终止并从池中删除;
第四个参数unit,指示第三个参数的时间单位;
第五个参数workQueue,存储待执行任务的阻塞队列,这些任务必须是Runnable的对象(如果是Callable对象,会在submit内部转换为Runnable对象)。
弄明白以上概念后,再来看这五种线程池:
单线程池:newSingleThreadExecutor()方法创建,五个参数分别是ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())。含义是池中保持一个线程,最多也只有一个线程,也就是说这个线程池是顺序执行任务的,多余的任务就在队列中排队。
固定线程池:newFixedThreadPool(nThreads)方法创建,五个参数分别是ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())。含义是池中保持nThreads个线程,最多也只有nThreads个线程,多余的任务也在队列中排队。
缓存线程池:newCachedThreadPool()创建,五个参数分别是ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue())。含义是池中不保持固定数量的线程,随需创建,最多可以创建Integer.MAX_VALUE个线程(说一句,这个数量已经大大超过目前任何操作系统允许的线程数了),空闲的线程最多保持60秒,多余的任务在SynchronousQueue(所有阻塞、并发队列在后续文章中具体介绍)中等待。
单线程调度线程池:newSingleThreadScheduledExecutor()创建,五个参数分别是 (1, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue())。含义是池中保持1个线程,多余的任务在DelayedWorkQueue中等待。
固定调度线程池:newScheduledThreadPool(n)创建,五个参数分别是 (n, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue())。含义是池中保持n个线程,多余的任务在DelayedWorkQueue中等待。
先看第一个例子,测试单线程池、固定线程池和缓存线程池(注意增加和取消注释):
从运行结果可以看出,单线程池中的线程是顺序执行的。固定线程池(参数为2)中,永远最多只有两个线程并发执行。缓存线程池中,所有线程都并发执行。
第二个例子,测试单线程调度线程池和固定调度线程池。
从运行结果可以看出,单线程调度线程池和单线程池类似,而固定调度线程池和固定线程池类似。
总结:如果没有特殊要求,使用缓存线程池总是合适的;如果只能运行固定数量的线程,就使用固定线程池;如果只能运行一个线程,就使用单线程池。如果要运行调度任务,则按需使用调度线程池或单线程调度线程池。如果有其他特殊要求,则可以直接使用ThreadPoolExecutor类的构造函数来创建线程池,并自己给定那五个参数。
1.它对问题的解决思路是分而治之,先将一个问题fork(分为)几个子问题,然后子问题又分为孙子问题,直至细分为一个容易计算的问题,然后再将结果依次join(结合)为最终的答案。是不是感觉和云计算中的Map-reduce计算模型很像?思路是一样的,只不过fork-join运行在一个JVM中的多个线程内,而map-reduce运行在分布式计算节点上。
2.在运行线程时,它使用“work-steal”(任务偷取)算法。一般来说,fork-join会启动多个线程(由参数指定,若不指定则默认为CPU核心数量),每个线程负责一个任务队列,并依次从队列头部获得任务并执行。当某个线程空闲时,它会从其他线程的任务队列尾部偷取一个任务来执行,这样就保证了线程的运行效率达到最高。
3.它面向的问题域是可以大量并行执行的计算任务,例如计算某个大型数组中每个元素的平方(当然这个有些无趣),其计算对象最好是一些独立的元素,不会被其他线程访问,也没有同步、互斥要求,更不要涉及IO或者无限循环。当然此框架也可以执行普通的并发编程任务,但是这时就失去了性能优势。
4.细分的计算任务有一个粗略的优化标准,即可以在100~10000条指令中执行完毕。
了解以上思路后,来看看fork-join框架提供的几个工具类:
ForkJoinPool:支持fork-join框架的线程池,所有ForkJoinTask任务都必须在其中运行,线程池主要使用invoke()、invokeAll()等方法来执行任务,当然也可以使用原有的execute()和submit()方法;
ForkJoinTask:支持fork-join框架的任务抽象类,它是Future接口,它代表一个支持fork()和join()方法的任务;
RecursiveAction:ForkJoinTask的两个具体子类之一,代表没有返回值的ForkJoinTask任务;
RecursiveTask:ForkJoinTask的两个具体子类之一,代表有返回值的ForkJoinTask任务。
代码还是比较简单的,首先创建一个ForkJoinPool,然后new一个ComputeTask对象丢到线程池中运行。ComputeTask继承自RecursiveAction,实现了其compute()方法。这个方法中,当上界和下界的差小于一个值(这里是2)时,直接计算;否则创建两个子任务,并丢到线程池中。最值得注意的是compute()方法,这个方法里面体现了“分而治之”的思想,对程序员来说,叫递归思想更加合适。只不过普通的递归是在单线程中完成的,而这里的递归则把递归任务通过invokeAll()方法丢进了线程池中,让线程池来调度执行。运行结果是Time span = 3798。
再看看单线程的情况:
运行结果是Time span = 9354。
由于我的CPU是4核的,再看看4线程的情况:
运行结果是Time span = 4064。可以看出由于fork-join框架采用了任务偷取算法,比普通4线程快了一点点。
pool.invoke(task)将一个最初的任务扔进了线程池执行,这个任务将会执行它的compute()方法。在此方法中,若满足某个条件(例如数组上界和下界只差小于阈值THRESHOLD)则直接在这一段数组中查找最大值;若不满足条件,则找出中值mid,然后new出两个子任务lhs(left hand side)和rhs(right hand side),并调用invokeAll(lhs, rhs)将这两个子任务都扔进线程池执行。任务的join()方法会得到返回值,若任务尚未执行完毕则会在此阻塞。
通过这种编程模式,很好的将递归思想用到了多线程领域。值得注意的是,通过调整THRESHOLD可以增加或减少任务的个数,从而极大的影响线程的执行。在很多情况下,使用fork-join框架并不会比普通的多线程效率更高,甚至比单线程运行效率更低。因此,必须找到适合的场景,然后进行多次调优,才能获得性能的改进。
执行者与线程池的引入是因为Concurrency包的设计者想将线程的创建、执行和调度分离,从而使得用户能够更加专注于业务逻辑;Callable接口和Future接口使得异步执行结果的获取更加简单;ScheduledExecutorService取代Timer成为了线程重复和延迟执行的新标准;TimeUnit类的引入简化了时间段的表达工作;包中提供的五种线程池可以极大的满足程序员的各种需求,极端情况下还可以利用ThreadPoolExecutor类自己定制线程池。最后,从JDK1.7后引入的Fork-Join框架将“分而治之”的递归思想实现到线程池中,并应用“work-steal”算法实现了任务执行效率的提升。
Concurrency包中提供的工具众多,用法各异,熟练掌握后大有裨益。本文对包中的各种工具类从概念和用法上进行了分类,并给出了具体的样例代码。
1. 前言
在JDK1.4之前,Java多线程工具仅有Runnable、Thread、Timer以及synchronize关键字等寥寥无几的几个工具。随着多核处理器的普遍应用,Java并发的作用越来越重要,因此自JDK1.5后,Java参考了操作系统以及其他编程语言的并发理念,发明了许多的概念、方法和算法,并提供了相关的工具。这些工具极大的提高了Java并发编程的可靠性、安全性和性能,成为Java程序员必须掌握的利器之一。2. Concurrency包的分类
按照用途与特性,Concurrency包中包含的工具被分为六类(外加一个工具类TimeUnit),即:1. 执行者与线程池
2. 并发队列
3. 同步工具
4. 并发集合
5. 锁
6. 原子变量
本文介绍的就是第一类,执行者与线程池。
3. 执行者Executor的由来
在介绍具体的工具之前,先讲讲设计者的思路。在Java1.4之前,已经提供了Runnable接口、Thread类、Timer类和synchronize关键字,它们已经足以完成各种各样的多线程编程任务,为什么还要提供执行者这样的概念呢?这是因为Java的设计者想把线程的创建、执行和调度分离。在Concurrency包出现之前,线程的创建基本上靠new一个Thread对象,执行靠start()方法,而线程的调度则完全依赖程序员在具体代码中自己写出来。而Concurrency包出现之后,线程的创建还是依靠Thread、Runnable和Callable(新加入)对象的实例化;而线程的执行则靠Executor、ExecutorService的对象执行execute()方法或submit()方法;线程的调度则被固化为几个具体的线程池类,如ThreadPoolExecutor、ScheduledThreadPoolExecutor、ExecutorCompletionService等等。这样表面上增加了复杂度,而实际上成功将线程的创建、执行和调度的业务逻辑分离,使程序员能够将精力集中在线程中业务逻辑的编写,大大提高了编码效率,降低了出错的概率,而且大大提高了性能。
4. 工具介绍
4.1 线程执行者
这个功能主要由三个接口和类提供,分别是:Executor:执行者接口,所有执行者的父类;
ExecutorService:执行者服务接口,具体的执行者类都继承自此接口;
Executors:执行者工具类,大部分执行者的实例以及线程池都由它的工厂方法创建。
先看一个例子:
public class ExecutorExam { public static void main(String[] args) { ExecutorService service = Executors.newCachedThreadPool(); service.execute(new Task("task1")); service.execute(new Task("task2")); service.execute(new Task("task3")); service.shutdown(); } } class Task implements Runnable { private final String name; Task(String name) { this.name = name; } @Override public void run() { try { for (int i = 0; i < 5; i++) { TimeUnit.SECONDS.sleep(1); System.out.println(name + "-[" + i + "]"); } } catch (InterruptedException e) { e.printStackTrace(); } } }
Executors工具类的newCachedThreadPool方法创建了一个执行者的对象,该对象是ExecutorService的实现类(后面我们会知道它是一个ThreadPoolExecutor对象)。Task是Runnable接口的实现类,代码中new出了三个Task对象,它们被提交给ExecutorService的execute方法作为参数,即三个线程被ExecutorService执行了,其调度机制隐含在执行者的具体类中。最后ExecutorService调用shutdown方法,阻止继续提交其他线程,并等待执行中的线程结束。
通过这种方式,线程的创建、执行和调度被分离了,程序员可以将精力集中在线程的内部业务中。
4.2 得到异步执行的结果
在Java1.4之前,如果要得到一个线程运行后产生的值,没有一套现成的机制,程序员可以通过Thread类的成员变量、程序的全局变量等方式来得到一个线程运行后产生的某个值。但是这样的话,你必须不断探测线程是否已经成功结束,或者运用同步技术来等待线程执行完成,再去获取异步执行的结果。在Java Concurrency中,得到异步结果有了一套固定的机制,即通过Callable接口、Future接口和ExecutorService的submit方法来得到异步执行的结果,相关工具的介绍如下:
Callable:泛型接口,与Runnable接口类似,它的实例可以被另一个线程执行,内部有一个call方法,返回一个泛型变量V;
Future:泛型接口,代表依次异步执行的结果值,调用其get方法可以得到一次异步执行的结果,如果运算未完成,则阻塞直到完成;调用其cancel方法可以取消一次异步执行;
CompletionService:一种执行者,可将submit的多个任务的结果按照完成的先后顺序存入一个内部队列,然后可以使用take方法从队列中依次取出结果并移除,如果调用take时计算未完成则会阻塞。
先看一个简单的例子,得到一个异步执行的整形值:
public class CallableExam { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService service = Executors.newCachedThreadPool(); Future<Integer> future = service.submit(new Callable<Integer>() { @Override public Integer call() throws Exception { System.out.println("Callable is running"); TimeUnit.SECONDS.sleep(2); return 47; } }); service.shutdown(); System.out.println("future.get = " + future.get()); } }
接下来是一个使用CompletionService的例子,创建5个线程计算一些值,执行完成后使用CompletionService依次取出结果并打印:
public class CompletionServiceExam { public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService service = Executors.newCachedThreadPool(); CompletionService<Integer> completionService = new ExecutorCompletionService<>(service); for (int i = 0; i < 5; i++) { completionService.submit(new TaskInteger(i)); } service.shutdown(); //will block for (int i = 0; i < 5; i++) { Future<Integer> future = completionService.take(); System.out.println(future.get()); } } } class TaskInteger implements Callable<Integer> { private final int sum; TaskInteger(int sum) { this.sum = sum; } @Override public Integer call() throws Exception { TimeUnit.SECONDS.sleep(sum); return sum * sum; } }
需要注意的是CompletionService的创建方法,它的构造函数需要一个ExecutorService的对象作为参数。
4.3 重复执行和延期执行
在Java1.4之前,一般使用Timer来重复或者延期执行任务。Java Concurrency为了使之与Executor理念协调,引入了ScheduledExecutorService来完成同样的工作。ScheduledExecutorService:另一种执行者,可以将提交的任务延期执行,也可以将提交的任务反复执行。
ScheduledFuture:与Future接口类似,代表一个被调度执行的异步任务的返回值。
下面的例子中ScheduledExecutorService的实例scheduler调度了两个任务,第一个任务使用scheduleAtFixedRate()方法每隔一秒重复打印“beep”,第二个任务使用schedule()方法在10秒后延迟执行,它的作用是取消第一个任务,代码如下:
public class ScheduledExecutorServiceExam { public static void main(String[] args) { ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor(2); ScheduledFuture<?> scheduledFuture = scheduler.scheduleAtFixedRate(new Runnable() { @Override public void run() { System.out.println("beep"); } }, 1, 1, TimeUnit.SECONDS); scheduler.schedule(new Runnable() { @Override public void run() { System.out.println("cancel beep"); scheduledFuture.cancel(true); scheduler.shutdown(); } }, 10, TimeUnit.SECONDS); } }
4.4 TimeUnit
上面的例子中用到了TimeUnit,因此这里就先介绍一下。TimeUnit是Java Concurrency包引入的新式的表达时间间隔或延迟的单位。在JDK1.5后面引入的新类中,都使用TimeUnit作为时间的表达方式。例如:lock.tryLock(50L, TimeUnit.MILLISECONDS) public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
还有,使用最多的,替代Thread.sleep的这种用法:
TimeUnit.SECONDS.sleep(5); TimeUnit有以下6个时间单位: NANOSECONDS:纳秒,1000纳秒为一微秒 MICROSECONDS:微秒,1000微秒为一毫秒 MILLISECONDS:毫秒,1000毫秒为一秒 SECONDS:秒,60秒为1分钟 MINUTES:分钟,60分钟为1小时 HOURS:小时,24小时为1天 DAYS:天
原来Java的时间单位默认为毫秒,引入TimeUnit后增加了纳秒和微秒,精度更高了,同时引入了很多转换方法,便于使用。
4.5 线程池
在前面的章节中我们已经多次使用了ExecutorService接口的对象,它的具体实现类主要有两个,分别是ThreadPoolExecutor和ScheduledExecutorService,它们都是某种线程池。在JDK1.8中,按照线程池的创建方法来看,应该有五种线程池,它们的创建方法如下所示:
ExecutorService singleThreadPool = Executors.newSingleThreadExecutor(); ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5); ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); ScheduledExecutorService singleThreadScheduledPool = Executors.newSingleThreadScheduledExecutor(); ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(5);
而进一步查看源代码,这些方法最终都调用了ThreadPoolExecutor和ScheduledExecutorService的构造函数。而ScheduledExecutorService继承自ThreadPoolExecutor,因此最终所有线程池的构造函数都调用了ThreadPoolExecutor的如下构造函数:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
请大家注意这五个参数(重点):
第一个参数corePoolSize,指明了线程池中应该保持的线程数量,即使线程处于空闲状态,除非设置了allowCoreThreadTimeOut这个参数;
第二个参数maximumPoolSize,线程池中允许的最大线程数,线程池中的线程一旦到达这个数,后续任务就会等待池中的线程空闲,而不会去创建新的线程了;
第三个参数keepAliveTime,当池中线程数量大于corePoolSize时,多出的线程在空闲时能够生存的最大时间,若一个线程空闲超过这个时间,它就会被终止并从池中删除;
第四个参数unit,指示第三个参数的时间单位;
第五个参数workQueue,存储待执行任务的阻塞队列,这些任务必须是Runnable的对象(如果是Callable对象,会在submit内部转换为Runnable对象)。
弄明白以上概念后,再来看这五种线程池:
单线程池:newSingleThreadExecutor()方法创建,五个参数分别是ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())。含义是池中保持一个线程,最多也只有一个线程,也就是说这个线程池是顺序执行任务的,多余的任务就在队列中排队。
固定线程池:newFixedThreadPool(nThreads)方法创建,五个参数分别是ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())。含义是池中保持nThreads个线程,最多也只有nThreads个线程,多余的任务也在队列中排队。
缓存线程池:newCachedThreadPool()创建,五个参数分别是ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue())。含义是池中不保持固定数量的线程,随需创建,最多可以创建Integer.MAX_VALUE个线程(说一句,这个数量已经大大超过目前任何操作系统允许的线程数了),空闲的线程最多保持60秒,多余的任务在SynchronousQueue(所有阻塞、并发队列在后续文章中具体介绍)中等待。
单线程调度线程池:newSingleThreadScheduledExecutor()创建,五个参数分别是 (1, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue())。含义是池中保持1个线程,多余的任务在DelayedWorkQueue中等待。
固定调度线程池:newScheduledThreadPool(n)创建,五个参数分别是 (n, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue())。含义是池中保持n个线程,多余的任务在DelayedWorkQueue中等待。
先看第一个例子,测试单线程池、固定线程池和缓存线程池(注意增加和取消注释):
public class ThreadPoolExam { public static void main(String[] args) { //first test for singleThreadPool ExecutorService pool = Executors.newSingleThreadExecutor(); //second test for fixedThreadPool // ExecutorService pool = Executors.newFixedThreadPool(2); //third test for cachedThreadPool // ExecutorService pool = Executors.newCachedThreadPool(); for (int i = 0; i < 5; i++) { pool.execute(new TaskInPool(i)); } pool.shutdown(); } } class TaskInPool implements Runnable { private final int id; TaskInPool(int id) { this.id = id; } @Override public void run() { try { for (int i = 0; i < 5; i++) { System.out.println("TaskInPool-["+id+"] is running phase-"+i); TimeUnit.SECONDS.sleep(1); } System.out.println("TaskInPool-["+id+"] is over"); } catch (InterruptedException e) { e.printStackTrace(); } } }
从运行结果可以看出,单线程池中的线程是顺序执行的。固定线程池(参数为2)中,永远最多只有两个线程并发执行。缓存线程池中,所有线程都并发执行。
第二个例子,测试单线程调度线程池和固定调度线程池。
public class ScheduledThreadPoolExam { public static void main(String[] args) { //first test for singleThreadScheduledPool ScheduledExecutorService scheduledPool = Executors.newSingleThreadScheduledExecutor(); //second test for scheduledThreadPool // ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(2); for (int i = 0; i < 5; i++) { scheduledPool.schedule(new TaskInScheduledPool(i), 0, TimeUnit.SECONDS); } scheduledPool.shutdown(); } } class TaskInScheduledPool implements Runnable { private final int id; TaskInScheduledPool(int id) { this.id = id; } @Override public void run() { try { for (int i = 0; i < 5; i++) { System.out.println("TaskInScheduledPool-["+id+"] is running phase-"+i); TimeUnit.SECONDS.sleep(1); } System.out.println("TaskInScheduledPool-["+id+"] is over"); } catch (InterruptedException e) { e.printStackTrace(); } } }
从运行结果可以看出,单线程调度线程池和单线程池类似,而固定调度线程池和固定线程池类似。
总结:如果没有特殊要求,使用缓存线程池总是合适的;如果只能运行固定数量的线程,就使用固定线程池;如果只能运行一个线程,就使用单线程池。如果要运行调度任务,则按需使用调度线程池或单线程调度线程池。如果有其他特殊要求,则可以直接使用ThreadPoolExecutor类的构造函数来创建线程池,并自己给定那五个参数。
5. Fork-Join框架
在JDK1.7引入了一种新的并行编程模式“fork-join”,它是实现了“分而治之”思想的Java并发编程框架。网上关于此框架的各种介绍很多,本文从框架特点出发,通过几个例子来进行实用性的介绍。5.1 fork-join框架的特点
fork-join框架对新手来说很难理解,因此我先从它的特点说起,它有几个特点:1.它对问题的解决思路是分而治之,先将一个问题fork(分为)几个子问题,然后子问题又分为孙子问题,直至细分为一个容易计算的问题,然后再将结果依次join(结合)为最终的答案。是不是感觉和云计算中的Map-reduce计算模型很像?思路是一样的,只不过fork-join运行在一个JVM中的多个线程内,而map-reduce运行在分布式计算节点上。
2.在运行线程时,它使用“work-steal”(任务偷取)算法。一般来说,fork-join会启动多个线程(由参数指定,若不指定则默认为CPU核心数量),每个线程负责一个任务队列,并依次从队列头部获得任务并执行。当某个线程空闲时,它会从其他线程的任务队列尾部偷取一个任务来执行,这样就保证了线程的运行效率达到最高。
3.它面向的问题域是可以大量并行执行的计算任务,例如计算某个大型数组中每个元素的平方(当然这个有些无趣),其计算对象最好是一些独立的元素,不会被其他线程访问,也没有同步、互斥要求,更不要涉及IO或者无限循环。当然此框架也可以执行普通的并发编程任务,但是这时就失去了性能优势。
4.细分的计算任务有一个粗略的优化标准,即可以在100~10000条指令中执行完毕。
了解以上思路后,来看看fork-join框架提供的几个工具类:
ForkJoinPool:支持fork-join框架的线程池,所有ForkJoinTask任务都必须在其中运行,线程池主要使用invoke()、invokeAll()等方法来执行任务,当然也可以使用原有的execute()和submit()方法;
ForkJoinTask:支持fork-join框架的任务抽象类,它是Future接口,它代表一个支持fork()和join()方法的任务;
RecursiveAction:ForkJoinTask的两个具体子类之一,代表没有返回值的ForkJoinTask任务;
RecursiveTask:ForkJoinTask的两个具体子类之一,代表有返回值的ForkJoinTask任务。
5.2 RecursiveAction
先来看一个使用RecursiveAction的例子,这段代码的目的是计算一个大型数组中每个元素x的一个公式的值,这个公式是sin(x)+cos(x)+tan(x):public class RecursiveActionExam { private final static int NUMBER = 10000000; public static void main(String[] args) { double[] array = new double[NUMBER]; for (int i = 0; i < NUMBER; i++) { array[i] = i; } long startTime = System.currentTimeMillis(); System.out.println(Runtime.getRuntime().availableProcessors()); ForkJoinPool forkJoinPool = new ForkJoinPool(); forkJoinPool.invoke(new ComputeTask(array, 0, array.length)); long endTime = System.currentTimeMillis(); System.out.println("Time span = " + (endTime - startTime)); } } class ComputeTask extends RecursiveAction { final double[] array; final int lo, hi; ComputeTask(double[] array, int lo, int hi) { this.array = array; this.lo = lo; this.hi = hi; } protected void compute() { if (hi - lo < 2) { for (int i = lo; i < hi; ++i) array[i] = Math.sin(array[i]) + Math.cos(array[i]) + Math.tan(array[i]); } else { int mid = (lo + hi) >>> 1; invokeAll(new ComputeTask(array, lo, mid), new ComputeTask(array, mid, hi)); } } }
代码还是比较简单的,首先创建一个ForkJoinPool,然后new一个ComputeTask对象丢到线程池中运行。ComputeTask继承自RecursiveAction,实现了其compute()方法。这个方法中,当上界和下界的差小于一个值(这里是2)时,直接计算;否则创建两个子任务,并丢到线程池中。最值得注意的是compute()方法,这个方法里面体现了“分而治之”的思想,对程序员来说,叫递归思想更加合适。只不过普通的递归是在单线程中完成的,而这里的递归则把递归任务通过invokeAll()方法丢进了线程池中,让线程池来调度执行。运行结果是Time span = 3798。
再看看单线程的情况:
public class RecursiveSequenceExam { private final static int NUMBER = 10000000; public static void main(String[] args) { double[] array = new double[NUMBER]; for (int i = 0; i < NUMBER; i++) { array[i] = i; } long startTime = System.currentTimeMillis(); for (int i = 0; i < NUMBER; i++) { array[i] = Math.sin(array[i]) + Math.cos(array[i]) + Math.tan(array[i]); } long endTime = System.currentTimeMillis(); System.out.println("Time span = " + (endTime - startTime)); } }
运行结果是Time span = 9354。
由于我的CPU是4核的,再看看4线程的情况:
public class Recusive4ThreadExam { private final static int NUMBER = 10000000; public static void main(String[] args) throws InterruptedException { double[] array = new double[NUMBER]; for (int i = 0; i < NUMBER; i++) { array[i] = i; } long startTime = System.currentTimeMillis(); ExecutorService service = Executors.newFixedThreadPool(4); service.execute(new ArrayTask(array, 0, NUMBER / 4)); service.execute(new ArrayTask(array, NUMBER / 4, NUMBER / 2)); service.execute(new ArrayTask(array, NUMBER / 2, NUMBER*3 / 4)); service.execute(new ArrayTask(array, NUMBER*3 / 4, NUMBER )); service.shutdown(); service.awaitTermination(1,TimeUnit.DAYS); long endTime = System.currentTimeMillis(); System.out.println("Time span = " + (endTime - startTime)); } } class ArrayTask implements Runnable { final double[] array; final int lo, hi; ArrayTask(double[] array, int lo, int hi) { this.array = array; this.lo = lo; this.hi = hi; } @Override public void run() { for (int i = lo; i < hi; ++i) array[i] = Math.sin(array[i]) + Math.cos(array[i]) + Math.tan(array[i]); } }
运行结果是Time span = 4064。可以看出由于fork-join框架采用了任务偷取算法,比普通4线程快了一点点。
5.3 RecursiveTask
下面来看一个更有意义的场景,寻找一个大型数组的最小值,这里我使用RecursiveTask(其实使用RecursiveAction也行,在它内部用一个成员变量保存结果即可)。代码如下:public class RecursiveFindMax { private static Random rand = new Random(47); private static final int NUMBER = 1000000; public static void main(String[] args) { double[] array = new double[NUMBER]; for (int i = 0; i < NUMBER; i++) { array[i] = rand.nextDouble(); } ForkJoinPool pool = new ForkJoinPool(); TaskFindMax task = new TaskFindMax(0, array.length - 1, array); pool.invoke(task); System.out.println("MaxValue = " + task.join()); } } class TaskFindMax extends RecursiveTask<Double> { private final int lo; private final int hi; private final double[] array; //you can change THRESHOLD to get better efficiency private final static int THRESHOLD = 10; TaskFindMax(int lo, int hi, double[] array) { this.lo = lo; this.hi = hi; this.array = array; } @Override protected Double compute() { if ((hi - lo) < THRESHOLD) { double max = array[lo]; for (int i = lo; i < hi; i++) { max = Math.max(max, array[i + 1]); } return max; } else { int mid = (lo + hi) >>> 1; TaskFindMax lhs = new TaskFindMax(lo, mid, array); TaskFindMax rhs = new TaskFindMax(mid, hi, array); invokeAll(lhs, rhs); return Math.max(lhs.join(), rhs.join()); } } }
pool.invoke(task)将一个最初的任务扔进了线程池执行,这个任务将会执行它的compute()方法。在此方法中,若满足某个条件(例如数组上界和下界只差小于阈值THRESHOLD)则直接在这一段数组中查找最大值;若不满足条件,则找出中值mid,然后new出两个子任务lhs(left hand side)和rhs(right hand side),并调用invokeAll(lhs, rhs)将这两个子任务都扔进线程池执行。任务的join()方法会得到返回值,若任务尚未执行完毕则会在此阻塞。
通过这种编程模式,很好的将递归思想用到了多线程领域。值得注意的是,通过调整THRESHOLD可以增加或减少任务的个数,从而极大的影响线程的执行。在很多情况下,使用fork-join框架并不会比普通的多线程效率更高,甚至比单线程运行效率更低。因此,必须找到适合的场景,然后进行多次调优,才能获得性能的改进。
6. 小结
本来准备在一篇文章中将Concurrency包中所有内容都介绍完,刚刚写了一个开头就知道自己过于自大了,因此本系统将分为六篇文章。执行者与线程池的引入是因为Concurrency包的设计者想将线程的创建、执行和调度分离,从而使得用户能够更加专注于业务逻辑;Callable接口和Future接口使得异步执行结果的获取更加简单;ScheduledExecutorService取代Timer成为了线程重复和延迟执行的新标准;TimeUnit类的引入简化了时间段的表达工作;包中提供的五种线程池可以极大的满足程序员的各种需求,极端情况下还可以利用ThreadPoolExecutor类自己定制线程池。最后,从JDK1.7后引入的Fork-Join框架将“分而治之”的递归思想实现到线程池中,并应用“work-steal”算法实现了任务执行效率的提升。
相关文章推荐
- (Java功能篇)java版本 线程池实例代码
- java中通用的线程池实例代码
- Java Concurrency代码实例之四-锁
- [转载]java 代码构建线程池的实例
- Java 定时器(Timer)及线程池里使用定时器实例代码
- java简单线程池实例代码
- Java Concurrency代码实例之二并发队列
- Java 线程池详解及实例代码
- Java Concurrency代码实例之六-ConcurrentHashMap
- Java 线程池ExecutorService详解及实例代码
- java简单线程池实例代码
- [学习笔记]Java代码构建一个线程池的自己学习写的实例,用这个你会更好的理解文章内容
- JAVA中关于树型图以及文件过滤器的基本实例代码(转贴请署名出处^_^)
- Java批注的发明起因及代码应用实例(1)
- 100行Java代码构建一个线程池
- 100行Java代码构建一个线程池(转载)
- JAVA分页代码实例
- 实例代码讲解Java连接Oracle数据库的各种方法
- Java实现嵌入式数据库(文本数据库)代码实例
- Java代码构建一个线程池