Java并发编程核心方法与框架-CompletionService的使用
2016-07-14 20:31
531 查看
接口CompletionService的功能是以异步的方式一边生产新的任务,一边处理已完成任务的结果,这样可以将执行任务与处理任务分离。使用submit()执行任务,使用take取得已完成的任务,并按照这些任务的时间顺序处理他们的结果。
使用CompletionService解决Future的缺点
控制台打印结果如下:
从打印结果来看,CompletionService解决了Future阻塞的缺点,哪个任务先执行完,哪个任务就先返回值。在CompletionService接口中如果当前没有任务执行完,则completionService.take().get()方法还是呈阻塞特性。
take()方法
打印结果如下:
take()方法是按照任务执行的速度,从快到慢获得Future对象
poll()方法
打印结果如下:
方法poll()返回的Future为null,因为当前没有任何已完成任务的Future对象。poll()不像take()方法具有阻塞的效果。
类CompletionService与异常
控制台打印结果如下:
MyCalableB类中虽然跑出了异常,但是并没有调用GutureTask类的get()方法,所以不出现异常。
对以上代码做如下修改:
此时运行结果如下:
MyCalableA先执行完,未抛出异常。MyCalableB线程抛出异常。
对以上代码做如下修改:
此时控制台输出结果如下:
此时B任务抛出异常,A任务执行完但未返回值。
对以上代码继续做如下修改:
此时运行结果如下:
未调用get()方法,未抛出异常。
继续修改以上代码:
控制台打印结果如下:
此时A任务有返回值。B任务抛出异常,无返回值。
继续修改以上代码:
运行结果如下:
此时任务B抛出异常,任务A未打印。
使用CompletionService解决Future的缺点
public class MyCallable implements Callable<String> { private String username; private long sleepValue; public MyCallable(String username, long sleepValue) { super(); this.username = username; this.sleepValue = sleepValue; } @Override public String call() throws Exception { System.out.println(username + " " + Thread.currentThread().getName() + System.currentTimeMillis()); Thread.sleep(sleepValue); return "return " + username; } public static void main(String[] args) { try { MyCallable callable1 = new MyCallable("username1", 5000); MyCallable callable2 = new MyCallable("username2", 4000); MyCallable callable3 = new MyCallable("username3", 3000); MyCallable callable4 = new MyCallable("username4", 2000); MyCallable callable5 = new MyCallable("username5", 1000); List<Callable<String>> callables = new ArrayList<>(); callables.add(callable1); callables.add(callable2); callables.add(callable3); callables.add(callable4); callables.add(callable5); int corePoolSize = 5; int maximumPoolSize = 10; int keepAliveTime = 5; TimeUnit unit = TimeUnit.SECONDS; LinkedBlockingDeque<Runnable> workQueue = new LinkedBlockingDeque<>(); ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); CompletionService completionService = new ExecutorCompletionService<>(executor); for (int i = 0; i < 5; i++) { completionService.submit(callables.get(i)); } for (int i = 0; i < 5; i++) { //take方法:获取并移除表示下一个已完成任务的Future,如果目前不存在这样的任务,则等待。 System.out.println(completionService.take().get() + " -- " + System.currentTimeMillis()); } } catch (Exception e) { e.printStackTrace(); } } }
控制台打印结果如下:
username2 pool-1-thread-21470920687933 username4 pool-1-thread-41470920687934 username3 pool-1-thread-31470920687933 username5 pool-1-thread-51470920687934 username1 pool-1-thread-11470920687933 return username5 -- 1470920688939 return username4 -- 1470920689937 return username3 -- 1470920690938 return username2 -- 1470920691938 return username1 -- 1470920692938
从打印结果来看,CompletionService解决了Future阻塞的缺点,哪个任务先执行完,哪个任务就先返回值。在CompletionService接口中如果当前没有任务执行完,则completionService.take().get()方法还是呈阻塞特性。
take()方法
public class Run { public static void main(String[] args) { try { ExecutorService executorService = Executors.newCachedThreadPool(); CompletionService<String> completionService = new ExecutorCompletionService<String>(executorService); for (int i = 0; i < 10; i++) { completionService.submit(new Callable<String>() { @Override public String call() throws Exception { long sleepValue = (int)(Math.random() * 1000); System.out.println("sleep=" + sleepValue + " " + Thread.currentThread().getName()); Thread.sleep(sleepValue); return sleepValue + "-" + Thread.currentThread().getName(); } }); } for (int i = 0; i < 10; i++) { //take()方法是按照任务执行的速度,从快到慢获得Future对象 System.out.println(completionService.take().get()); } } catch (Exception e) { e.printStackTrace(); } } }
打印结果如下:
sleep=662 pool-1-thread-2 sleep=476 pool-1-thread-6 sleep=977 pool-1-thread-5 sleep=175 pool-1-thread-7 sleep=461 pool-1-thread-4 sleep=836 pool-1-thread-3 sleep=267 pool-1-thread-1 sleep=82 pool-1-thread-8 sleep=714 pool-1-thread-9 sleep=946 pool-1-thread-10 82-pool-1-thread-8 175-pool-1-thread-7 267-pool-1-thread-1 461-pool-1-thread-4 476-pool-1-thread-6 662-pool-1-thread-2 714-pool-1-thread-9 836-pool-1-thread-3 946-pool-1-thread-10 977-pool-1-thread-5
take()方法是按照任务执行的速度,从快到慢获得Future对象
poll()方法
public class Run1 { public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); CompletionService<String> completionService = new ExecutorCompletionService<String>(executorService); completionService.submit(new Callable<String>() { @Override public String call() throws Exception { TimeUnit.SECONDS.sleep(3); System.out.println("3秒过了"); return "3S"; } }); //poll()方法获取并移除表示下一个已完成任务的Future, //如果不存在这样的任务则返回null,无阻塞效果 System.out.println(completionService.poll()); } }
打印结果如下:
null 3秒过了
方法poll()返回的Future为null,因为当前没有任何已完成任务的Future对象。poll()不像take()方法具有阻塞的效果。
类CompletionService与异常
public class MyCalableA implements Callable<String> { @Override public String call() throws Exception { System.out.println("MyCalableA begin," + System.currentTimeMillis()); TimeUnit.SECONDS.sleep(1); System.out.println("MyCalableA end," + System.currentTimeMillis()); return "MyCalableA"; } } public class MyCalableB implements Callable<String> { @Override public String call() throws Exception { System.out.println("MyCalableB begin," + System.currentTimeMillis()); TimeUnit.SECONDS.sleep(2); int i = 0; if(i == 0){ throw new Exception("异常..."); } System.out.println("MyCalableB end," + System.currentTimeMillis()); return "MyCalableB"; } } public class Main { public static void main(String[] args) { try { MyCalableA myCalableA = new MyCalableA(); MyCalableB myCalableB = new MyCalableB(); Executor executor = Executors.newSingleThreadExecutor(); CompletionService<String> completionService = new ExecutorCompletionService<>(executor); completionService.submit(myCalableA); completionService.submit(myCalableB); for (int i = 0; i < 2; i++) { System.out.println("----" + completionService.take()); } System.out.println("main end"); } catch (Exception e) { e.printStackTrace(); } } }
控制台打印结果如下:
MyCalableA begin,1471006680464 MyCalableA end,1471006682467 MyCalableB begin,1471006682467 ----java.util.concurrent.FutureTask@3918d722 ----java.util.concurrent.FutureTask@dd41677 main end
MyCalableB类中虽然跑出了异常,但是并没有调用GutureTask类的get()方法,所以不出现异常。
对以上代码做如下修改:
for (int i = 0; i < 2; i++) { System.out.println("----" + completionService.take().get()); }
此时运行结果如下:
MyCalableA begin,1471007865642 MyCalableA end,1471007866647 MyCalableB begin,1471007866647 ----MyCalableA java.util.concurrent.ExecutionException: java.lang.Exception: 异常... at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:188) at com.concurrent.chapter6.concurrent02.Main.main(Main.java:19) Caused by: java.lang.Exception: 异常... at com.concurrent.chapter6.concurrent02.MyCalableB.call(MyCalableB.java:14) at com.concurrent.chapter6.concurrent02.MyCalableB.call(MyCalableB.java:1) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)
MyCalableA先执行完,未抛出异常。MyCalableB线程抛出异常。
对以上代码做如下修改:
public class Main {
public static void main(String[] args) {
try {
MyCalableA myCalableA = new MyCalableA();
MyCalableB myCalableB = new MyCalableB();
Executor executor = Executors.newSingleThreadExecutor();//单线程
CompletionService<String> completionService = new ExecutorCompletionService<>(executor);
completionService.submit(myCalableB);//先执行B任务
completionService.submit(myCalableA);
for (int i = 0; i < 2; i++) { System.out.println("----" + completionService.take().get()); }
System.out.println("main end");
} catch (Exception e) {
e.printStackTrace();
}
}
}
此时控制台输出结果如下:
MyCalableB begin,1471008229367 MyCalableA begin,1471008231370 java.util.concurrent.ExecutionException: java.lang.Exception: 异常... at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:188) at com.concurrent.chapter6.concurrent02.Main.main(Main.java:19) Caused by: java.lang.Exception: 异常... at com.concurrent.chapter6.concurrent02.MyCalableB.call(MyCalableB.java:14) at com.concurrent.chapter6.concurrent02.MyCalableB.call(MyCalableB.java:1) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) MyCalableA end,1471008232373
此时B任务抛出异常,A任务执行完但未返回值。
对以上代码继续做如下修改:
public class Main { public static void main(String[] args) { try { MyCalableA myCalableA = new MyCalableA(); MyCalableB myCalableB = new MyCalableB(); Executor executor = Executors.newSingleThreadExecutor();//单线程 CompletionService<String> completionService = new ExecutorCompletionService<>(executor); completionService.submit(myCalableA);//先执行A任务 completionService.submit(myCalableB); for (int i = 0; i < 2; i++) { System.out.println("----" + completionService.poll()); } TimeUnit.SECONDS.sleep(5); System.out.println("A处:" + completionService.poll()); System.out.println("B处:" + completionService.poll()); System.out.println("main end"); } catch (Exception e) { e.printStackTrace(); } } }
此时运行结果如下:
----null ----null MyCalableA begin,1471009161165 MyCalableA end,1471009162166 MyCalableB begin,1471009162166 A处:java.util.concurrent.FutureTask@5f0ee5b8 B处:java.util.concurrent.FutureTask@4b0bc3c9 main end
未调用get()方法,未抛出异常。
继续修改以上代码:
public class Main { public static void main(String[] args) { try { MyCalableA myCalableA = new MyCalableA(); MyCalableB myCalableB = new MyCalableB(); Executor executor = Executors.newSingleThreadExecutor();//单线程 CompletionService<String> completionService = new ExecutorCompletionService<>(executor); completionService.submit(myCalableA);//先执行A任务 completionService.submit(myCalableB); for (int i = 0; i < 2; i++) { System.out.println("----" + completionService.poll()); } TimeUnit.SECONDS.sleep(5); System.out.println("A处:" + completionService.poll().get()); System.out.println("B处:" + completionService.poll().get()); System.out.println("main end"); } catch (Exception e) { e.printStackTrace(); } } }
控制台打印结果如下:
----null ----null MyCalableA begin,1471009511872 MyCalableA end,1471009512876 MyCalableB begin,1471009512877 A处:MyCalableA java.util.concurrent.ExecutionException: java.lang.Exception: 异常... at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:188) at com.concurrent.chapter6.concurrent02.Main.main(Main.java:24) Caused by: java.lang.Exception: 异常... at com.concurrent.chapter6.concurrent02.MyCalableB.call(MyCalableB.java:14) at com.concurrent.chapter6.concurrent02.MyCalableB.call(MyCalableB.java:1) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)
此时A任务有返回值。B任务抛出异常,无返回值。
继续修改以上代码:
public class Main { public static void main(String[] args) { try { MyCalableA myCalableA = new MyCalableA(); MyCalableB myCalableB = new MyCalableB(); Executor executor = Executors.newSingleThreadExecutor();//单线程 CompletionService<String> completionService = new ExecutorCompletionService<>(executor); completionService.submit(myCalableB);//先执行B任务 completionService.submit(myCalableA); for (int i = 0; i < 2; i++) { System.out.println("----" + completionService.poll()); } TimeUnit.SECONDS.sleep(5); System.out.println("A处:" + completionService.poll().get()); System.out.println("B处:" + completionService.poll().get()); System.out.println("main end"); } catch (Exception e) { e.printStackTrace(); } } }
运行结果如下:
----null ----null MyCalableB begin,1471009732036 MyCalableA begin,1471009734037 MyCalableA end,1471009735037 java.util.concurrent.ExecutionException: java.lang.Exception: 异常... at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:188) at com.concurrent.chapter6.concurrent02.Main.main(Main.java:23) Caused by: java.lang.Exception: 异常... at com.concurrent.chapter6.concurrent02.MyCalableB.call(MyCalableB.java:14) at com.concurrent.chapter6.concurrent02.MyCalableB.call(MyCalableB.java:1) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)
此时任务B抛出异常,任务A未打印。
相关文章推荐
- java的相关资料
- Java之@SuppressWarnings注解
- Java_day07
- Java并发编程核心方法与框架-Future和Callable的使用
- Java并发编程核心方法与框架-TheadPoolExecutor的使用
- 1-100之间所有的素数
- Solaris 安装JDK
- Java并发编程核心方法与框架-Executors的使用
- spring集成mina,包含心跳检测,实现服务端主动推送
- Java并发编程核心方法与框架-phaser的使用
- MVC中的视图层框架,Struts2的使用
- Java并发编程核心方法与框架-CyclicBarrier的使用
- Java并发编程核心方法与框架-CountDownLatch的使用
- Java并发编程核心方法与框架-exchanger的使用
- 尚学堂 JAVA DAY12 概念总结
- Java并发编程核心方法与框架-Semaphore的使用
- 安装Eclipse并配置JacORB插件
- Spring Boot实战之入门
- RxJava中的Subject和常见的生命周期管理
- java 一个函数如何返回多个值