您的位置:首页 > 编程语言 > Java开发

Java并发编程-Executor框架之CompletionService

2016-06-08 21:02 543 查看
在前面我们已经学习过Executor框架之Callable和Future接口,我们知道利用list保存submit的callable任务所返回的Future对象。再在主线程中遍历这个list并调用Future的get()方法取到任务的返回值进行处理。Java通过另一个类CompletionService进行改进。

两者最主要的区别在于submit的任务不一定是按照加入自己维护的list顺序完成的。从list中遍历的每个Future对象并不一定处于完成状态,这时调用get()方法就会被阻塞住(也可以设置阻塞时间为0,进行不断的轮询避免这个问题),如果系统是设计成每个线程完成后就能根据其结果继续做后面的事,这样对于处于list后面的但是先完成的线程就会增加了额外的等待时间。

CompletionService用BlockingQueue来维护Future并和Executor组合在一起。Executor不停的执行任务,将任务执行完成的返回结果加入到这个Queue中,take()方法其实就是Producer-Consumer中的Consumer,它会从Queue中取出Future对象,如果Queue是空的,就会阻塞在那里,直到有完成的Future对象加入到Queue中。实现任务的执行和返回结果处理的分离。

使用方法

构造方法ExecutorCompletionService(Executor executor)和ExecutorCompletionService(Executor executor,BlockingQueue completionQueue)分别将执行器和BlockingQueue加其中。

sumbit():提交任务,Executor执行任务。

take():从阻塞队列中曲执行结果Future对象。

poll():该方法与take类似,只是不会等待,如果没有满足要求,就返回null对象,也可以利用poll(long timeout, TimeUnit unit)设置等待时间。

使用实例

package MyThread;

import java.util.Date;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CompletionServiceTest
{
public static void main(String[] args)
{
ExecutorService executor = Executors.newCachedThreadPool();
// 构建完成服务
CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(executor);
//构建10个生成者
for(int i=0;i<10;i++){
completionService.submit(new Producer());
}
//利用主线程作为一个消费者
for(int i=0;i<10;i++){
try {
int result = completionService.take().get();
System.out.println(Thread.currentThread().getName()+" Comsumer:"+result);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}
executor.shutdown();
}
static class Producer implements Callable<Integer>{
@Override
public Integer call()  {
int ran=0;
try {
//休眠一段时间
ran = new Random().nextInt(1000);
Thread.sleep(ran);
System.out.println(Thread.currentThread().getName()+" Produce:"+ran);

} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return ran;
}

}

}


结果

pool-1-thread-6 Produce:50

main Comsumer:50

pool-1-thread-2 Produce:73

main Comsumer:73

pool-1-thread-7 Produce:98

main Comsumer:98

pool-1-thread-9 Produce:113

main Comsumer:113

pool-1-thread-8 Produce:316

main Comsumer:316

pool-1-thread-3 Produce:476

main Comsumer:476

pool-1-thread-10 Produce:594

main Comsumer:594

pool-1-thread-5 Produce:625

main Comsumer:625

pool-1-thread-1 Produce:637

main Comsumer:637

pool-1-thread-4 Produce:917

main Comsumer:917
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: