Java并发编程——Executor接口及线程池的使用
2016-07-16 11:55
573 查看
在如今的程序里,单线程的程序,应该已经比较少了,而Java语言是内置支持多线程并发的,大家都说Java语言内置支持多线程,非常非常的强大和方便,但一直没有深入研究jdk内concurrent包。今天就认真学习了一下java.util.concurrent包,发现jdk多线程编程果然是强大和方便。本文是学习java.util.concurrent包内线程池及相关接口的一些总结。
Runnable接口只是简单地提供了一个任务运行的入口。但对于任务执行的结果以及任务的状态,都是没有定义的。但在jdk 1.5之后,Java针对多线程任务,提供了更强大的接口支持。那就是提供了Callable和Future接口。这两个接口,为Task提供了更多的抽象。可以更方便进行Task抽象。
1. cancel 用于取消任务,mayInterruptIfRunning参数表示,是否通过中断取消正在运行的任务,如果任务已经完成,则此方法返回false;
2. isCancelled 任务是否被取消,如果任务在正常完成前被取消,则返回true;
3. isDone 任务是否完成,无论任务是通过正常执行完,或者中途抛出异常,或者被取消,都认为任务已经完成;
4. get 获取任务执行结果,如果任务正在执行,则等待任务执行完成。get可以指定超时时间,如果超时,则抛出TimeoutException异常,如果任务被取消,则抛出CancellationException异常,如果任务在中途抛出异常,则get方法将异常封装在ExecutionException异常内,并抛出;如果当前线程被中断,则抛出InterruptedException异常。
Future接口定义如下:
Executor接口是基于生产者-消费者模式,生产者只需要调用execute方法提交任务,至于消费者什么时候在哪里执行方法是和生产者解耦的。
ExecutorService接口是继承Executor接口的,ExecutorService接口提供了更多更丰富的方法。主要方法如下:
1. submit submit方法通过重载,可以接受Callable任务对象和Runnable对象;submit方法返回一个Future对象。通过返回的Future对象,可以进行线程池任务状态的查询,以及取消任务。
2. invokeAll invokeAll方法可以进行任务的匹配提交,接受一个任务列表,同时可以可选提供一个超时时间;invokeAll方法返回一个Future对象列表。
3. isShutdown 线程池是否已经关闭
4. isTerminated 线程池所有任务是否已经执行完全。
5. shutdown 关闭线程池,线程池关闭后,则不能再接受任务。此方法只是使线程池不再接收新的任务,但已经提交的任务,仍然会继续运行。
6. shutdownNow 立刻关闭线程池,正在运行的任务会继续执行。未执行的任务将不会再执行,并将等待执行的任务返回。
7. awaitTermination 在指定时间内,等待线程池任务完成。
下面是定长线程池以及Future接口和Callable接口的代码示例:
上面的代码,执行结果如下:
scheduleAtFixedRate和scheduleWithFixedDelay方法的区别,主要是scheduleAtFixedRate方法的任务间隔是以任务起始时间算,scheduleWithFixedDelay方法是以任务结束时间算的。
上面代码,运行结果如下:
可以看出,ScheduledTask2任务,会在上次任务结束后,立马开始运行,而ScheduledTask3任务,则会在上次任务结束后,再等待5秒再运行,当executor线程池中,添加了一个新的阻塞任务后,则ScheduledTask2任务,会等待上个任务运行结束。同时,从上面的线程池名称可以看出,线程池中,线程的数量是固定的。
任务接口抽象
Runnable接口
在java.lang包内,为多线程提供了Runnable接口。public interface Runnable { public abstract void run(); }
Runnable接口只是简单地提供了一个任务运行的入口。但对于任务执行的结果以及任务的状态,都是没有定义的。但在jdk 1.5之后,Java针对多线程任务,提供了更强大的接口支持。那就是提供了Callable和Future接口。这两个接口,为Task提供了更多的抽象。可以更方便进行Task抽象。
Callable接口
Callable接口,虽然定义仍然很简单。但提供了Task运行的返回值,同时,也支持抛出异常。Callable接口的定义如下:public interface Callable<V> { V call() throws Exception; }
Future接口
Future接口,是对异步任务结果的抽象。Future接口可以查询任务执行结果,或者等待任务结果,并且可以获取任务的执行结果。Future接口有5个方法。1. cancel 用于取消任务,mayInterruptIfRunning参数表示,是否通过中断取消正在运行的任务,如果任务已经完成,则此方法返回false;
2. isCancelled 任务是否被取消,如果任务在正常完成前被取消,则返回true;
3. isDone 任务是否完成,无论任务是通过正常执行完,或者中途抛出异常,或者被取消,都认为任务已经完成;
4. get 获取任务执行结果,如果任务正在执行,则等待任务执行完成。get可以指定超时时间,如果超时,则抛出TimeoutException异常,如果任务被取消,则抛出CancellationException异常,如果任务在中途抛出异常,则get方法将异常封装在ExecutionException异常内,并抛出;如果当前线程被中断,则抛出InterruptedException异常。
Future接口定义如下:
public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
Executor(执行器接口)
使用Thread和Runnable接口创建异步任务,线程和线程执行任务之间都是紧耦合的,同时,多个任务的执行策略,需要程序员自己进行繁琐地控制。为了将任务提交和任务执行之间进行解耦,Java在jdk1.5之后,提供了Executor接口。Executor接口定义如下:public interface Executor { void execute(Runnable command); }
Executor接口是基于生产者-消费者模式,生产者只需要调用execute方法提交任务,至于消费者什么时候在哪里执行方法是和生产者解耦的。
线程池概述
在Java的java.util.concurrent包内,通过Executors类的静态方法,主要提供了4种线程池。具体如下:方法 | 作用 |
---|---|
newFixedThreadPool | 创建一个定长的线程池,当提交一个任务就会创建一个线程,直到达到池子的最大长度,然后线程池内的线程数不会再变化,如果有线程由于异常Exception而结束,线程池会补充一个新的线程。 |
newCachedThreadPool | 创建一个可缓存的线程池,如果当前线程池内空闲线程过多,它可以灵活地回收空闲的线程,当需求增加时,它可以灵活地添加新的线程,而不会对池的长度作任何限制。 |
newSingleThreadExecutor | 创建一个单线程化的executor,并只创建唯一的工作线程来执行任务,如果这个线程异常结束,会另外创建一个取代它。executor会保证任务依照任务队列所规定的顺序(FIFO,LIFO,优先级)执行。 |
newScheduledThreadPool | 创建一个定长的线程池,而且支持定时的和周期性任务执行,类似于Timer。 |
ExecutorService接口
在上面四种线程池,newFixedThreadPool,newCachedThreadPool和newSingleThreadExecutor都是返回ExecutorService接口的对象。ExecutorService接口是继承Executor接口的,ExecutorService接口提供了更多更丰富的方法。主要方法如下:
1. submit submit方法通过重载,可以接受Callable任务对象和Runnable对象;submit方法返回一个Future对象。通过返回的Future对象,可以进行线程池任务状态的查询,以及取消任务。
2. invokeAll invokeAll方法可以进行任务的匹配提交,接受一个任务列表,同时可以可选提供一个超时时间;invokeAll方法返回一个Future对象列表。
3. isShutdown 线程池是否已经关闭
4. isTerminated 线程池所有任务是否已经执行完全。
5. shutdown 关闭线程池,线程池关闭后,则不能再接受任务。此方法只是使线程池不再接收新的任务,但已经提交的任务,仍然会继续运行。
6. shutdownNow 立刻关闭线程池,正在运行的任务会继续执行。未执行的任务将不会再执行,并将等待执行的任务返回。
7. awaitTermination 在指定时间内,等待线程池任务完成。
下面是定长线程池以及Future接口和Callable接口的代码示例:
package com.test.concurrent; import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; class Job1 implements Callable<Boolean> { int id = 0; public Job1(int id ){ this.id = id; } public Boolean call() throws Exception { System.out.println("Job1 running.id:"+id+" Current time:"+System.currentTimeMillis()); return false; } } class Job2 implements Callable<Boolean> { int id =0; public Job2(int id ){ this.id = id; } public Boolean call() throws Exception { System.out.println("Job2 running.id:"+id+" Current time:"+System.currentTimeMillis()); Thread.sleep(15*1000); System.out.println("Job2 end.id:"+id+" Current time:"+System.currentTimeMillis()); return true; } } class Job3 implements Callable<Boolean> { int id =0; public Job3(int id ){ this.id = id; } public Boolean call() throws Exception { System.out.println("Job3 running.id:"+id+" Current time:"+System.currentTimeMillis()); throw new RuntimeException("Job3 throw exception."); } } public class ThreadPool { static void addTask() throws InterruptedException, ExecutionException { int id = 0; ExecutorService executor = Executors.newFixedThreadPool(2); Future<Boolean> future1 = executor.submit(new Job1(++id)); Future<Boolean> future2 = executor.submit(new Job2(++id)); Future<Boolean> future3 = executor.submit(new Job3(++id)); System.out.println("job1 is done:"+future1.isDone()); System.out.println("job1 result:"+future1.get()); System.out.println("job2 is done:"+future2.isDone()); System.out.println("job2 result:"+future2.get()); try { System.out.println("job3 result:"+future3.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); e.getCause(); } List<Callable<Boolean>> taskList = new ArrayList<Callable<Boolean>>(); taskList.add(new Job1(++id)); taskList.add(new Job2(++id)); List<Future<Boolean>> futures = executor.invokeAll(taskList); for (Future<Boolean> future : futures) { System.out.println(future.get()); } executor.shutdown(); executor.submit(new Job1(2)); } public static void main( String[] args ) throws InterruptedException, ExecutionException { addTask(); } }
上面的代码,执行结果如下:
Job1 running.id:1 Current time:1468632394429 Job2 running.id:2 Current time:1468632394430 job1 is done:true Job3 running.id:3 Current time:1468632394431 job1 result:false job2 is done:false Job2 end.id:2 Current time:1468632409431 job2 result:true java.util.concurrent.ExecutionException: java.lang.RuntimeException: Job3 throw exception. at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:188) at com.test.concurrent.ThreadPool.addTask(ThreadPool.java:56) at com.test.concurrent.ThreadPool.main(ThreadPool.java:77) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) Caused by: java.lang.RuntimeException: Job3 throw exception. at com.test.concurrent.Job3.call(ThreadPool.java:40) at com.test.concurrent.Job3.call(ThreadPool.java:33) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.lang.Thread.run(Thread.java:745) Job1 running.id:4 Current time:1468632409434 Job2 running.id:5 Current time:1468632409434 Job2 end.id:5 Current time:1468632424435 false true Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@67854d1f rejected from java.util.concurrent.ThreadPoolExecutor@608a6351[Shutting down, pool size = 1, active threads = 0, queued tasks = 0, completed tasks = 5] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2048) at com.test.concurrent.ThreadPool.addTask(ThreadPool.java:73) at com.test.concurrent.ThreadPool.main(ThreadPool.java:77)
ScheduledExecutorService接口
ScheduledExecutorService接口继承ExecutorService接口,但额外提供了三个用于周期调度任务的接口。方法 | 说明 |
---|---|
schedule(Callable callable, long delay, TimeUnit unit) | 提交任务,并且指定任务延迟执行的时间,此方法提交的任务,只会被执行一次 |
scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) | 提交一个任务,同时指定延迟时间initialDelay,每隔任务开始period时间,如果,任务已经结束,则会再次调度任务;如果任务没有结束,则任务结束后,则会立刻调度。如果没有空闲进程,则仍旧会等待 |
scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) | 提交任务,指定延迟时间initialDelay,每次任务结束后,再间隔delay时间,再次调度任务 |
package com.test.concurrent; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; class ScheduledTask1 implements Runnable{ AtomicInteger index = new AtomicInteger(0); public void run() { int tmp_index = index.addAndGet(1); System.out.println("Thread name:"+Thread.currentThread().getName()+". Task1 begin running. index:"+tmp_index+" Current time:"+System.currentTimeMillis()); try { Thread.sleep(10*1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Task1 end running. index:"+tmp_index+" Current time:"+System.currentTimeMillis()); } } class ScheduledTask2 implements Runnable{ AtomicInteger index = new AtomicInteger(100); public void run() { int tmp_index = index.addAndGet(1); System.out.println("Thread name:"+Thread.currentThread().getName()+". Task2 begin running. index:"+tmp_index+" Current time:"+System.currentTimeMillis()); try { Thread.sleep(10*1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Task2 end running. index:"+tmp_index+" Current time:"+System.currentTimeMillis()); } } class ScheduledTask3 implements Runnable{ AtomicInteger index = new AtomicInteger(1000); public void run() { int tmp_index = index.addAndGet(1); System.out.println("Thread name:"+Thread.currentThread().getName()+". Task3 begin running. index:"+tmp_index+" Current time:"+System.currentTimeMillis()); try { Thread.sleep(10*1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Task3 end running. index:"+tmp_index+" Current time:"+System.currentTimeMillis()); } } public class ScheduledPool { static public void addTask() throws InterruptedException { ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); executor.schedule(new ScheduledTask1(), 20, TimeUnit.SECONDS); executor.scheduleAtFixedRate(new ScheduledTask2(), 0, 5, TimeUnit.SECONDS); ScheduledExecutorService executor2 = Executors.newScheduledThreadPool(1); executor2.scheduleWithFixedDelay(new ScheduledTask3(), 0, 5, TimeUnit.SECONDS); Thread.sleep(50*1000); executor.shutdown(); executor.shutdown(); } public static void main( String[] args ) throws InterruptedException { addTask(); } }
上面代码,运行结果如下:
Thread name:pool-1-thread-1. Task2 begin running. index:101 Current time:1468637711181 Thread name:pool-2-thread-1. Task3 begin running. index:1001 Current time:1468637711182 Task2 end running. index:101 Current time:1468637721183 Thread name:pool-1-thread-1. Task2 begin running. index:102 Current time:1468637721183 Task3 end running. index:1001 Current time:1468637721183 Thread name:pool-2-thread-1. Task3 begin running. index:1002 Current time:1468637726184 Task2 end running. index:102 Current time:1468637731184 Thread name:pool-1-thread-1. Task2 begin running. index:103 Current time:1468637731184 Task3 end running. index:1002 Current time:1468637736185 Task2 end running. index:103 Current time:1468637741185 Thread name:pool-1-thread-1. Task2 begin running. index:104 Current time:1468637741185 Thread name:pool-2-thread-1. Task3 begin running. index:1003 Current time:1468637741185 Task2 end running. index:104 Current time:1468637751185 Thread name:pool-1-thread-1. Task1 begin running. index:1 Current time:1468637751186 Task3 end running. index:1003 Current time:1468637751186 Thread name:pool-2-thread-1. Task3 begin running. index:1004 Current time:1468637756187 Task1 end running. index:1 Current time:1468637761186 Task3 end running. index:1004 Current time:1468637766187
可以看出,ScheduledTask2任务,会在上次任务结束后,立马开始运行,而ScheduledTask3任务,则会在上次任务结束后,再等待5秒再运行,当executor线程池中,添加了一个新的阻塞任务后,则ScheduledTask2任务,会等待上个任务运行结束。同时,从上面的线程池名称可以看出,线程池中,线程的数量是固定的。
相关文章推荐
- java对世界各个时区(TimeZone)的通用转换处理方法(转载)
- java-注解annotation
- java-模拟tomcat服务器
- java-用HttpURLConnection发送Http请求.
- java-WEB中的监听器Lisener
- Android IPC进程间通讯机制
- Android Native 绘图方法
- Android java 与 javascript互访(相互调用)的方法例子
- 介绍一款信息管理系统的开源框架---jeecg
- 聚类算法之kmeans算法java版本
- java实现 PageRank算法
- PropertyChangeListener简单理解
- c++11 + SDL2 + ffmpeg +OpenAL + java = Android播放器
- 插入排序
- 冒泡排序
- 堆排序
- 快速排序
- 二叉查找树