JAVA并发-Executor任务执行框架
2016-08-11 14:13
429 查看
首先介绍两个重要的接口,Executor和ExecutorService,定义如下:
为了配合使用上面的并发编程接口,有一个Executors工厂类,负责创建各类满足ExecutorService接口的线程池,具体如下:
newFixedThreadPool:创建一个固定长度的线程池,线程池中线程的数量从1增加到最大值后保持不变。如果某个线程坏死掉,将会补充一个新的线程。
newCachedThreadPool:创建长度不固定的线程池,线程池的规模不受限制,不常用。
newSingleThreadExecutor:创建一个单线程的Executor,他其中有一个线程来处理任务,如果这个线程坏死掉,将补充一个新线程。
newScheduledThreadPool:创建固定长度的线程池,以延时或定时的方式来执行任务。
下面是Executor和ExecutorService中常用方法的示例:
上面的ExecutorSerivce接口中的invokeAll(tasks)方法用于批量执行任务,并且将结果按照task列表中的顺序返回。此外,还存在一个批量执行任务的接口CompletionTask。ExecutorCompletionService是实现CompletionService接口的一个类,该类的实现原理很简单:
用Executor类来执行任务,同时把在执行任务的Future放到BlockingQueue<Future<V>>队列中。该类实现的关键就是重写FutureTask类的done()方法,FutureTask类的done()方法是一个钩子函数(关于钩子函数,请读者自行查询),done()方法在FutureTask任务被执行的时候被调用。
ExecutorCompletionService类的核心代码如下:
其中的done()方法定义如下:
ExecutorCompletionService的使用示例如下:
public interface Executor { void execute(Runnable command); }
public interface ExecutorService extends Executor { //不再接受新任务,待所有任务执行完毕后关闭ExecutorService void shutdown(); //不再接受新任务,直接关闭ExecutorService,返回没有执行的任务列表 List<Runnable> shutdownNow(); //判断ExecutorService是否关闭 boolean isShutdown(); //判断ExecutorService是否终止 boolean isTerminated(); //等待ExecutorService到达终止状态 boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; <T> Future<T> submit(Callable<T> task); //当task执行成功的时候future.get()返回result <T> Future<T> submit(Runnable task, T result); //当task执行成功的时候future.get()返回null Future<?> submit(Runnable task); //批量提交任务并获得他们的future,Task列表与Future列表一一对应 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; //批量提交任务并获得他们的future,并限定处理所有任务的时间 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; //批量提交任务并获得一个已经成功执行的任务的结果 <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
为了配合使用上面的并发编程接口,有一个Executors工厂类,负责创建各类满足ExecutorService接口的线程池,具体如下:
newFixedThreadPool:创建一个固定长度的线程池,线程池中线程的数量从1增加到最大值后保持不变。如果某个线程坏死掉,将会补充一个新的线程。
newCachedThreadPool:创建长度不固定的线程池,线程池的规模不受限制,不常用。
newSingleThreadExecutor:创建一个单线程的Executor,他其中有一个线程来处理任务,如果这个线程坏死掉,将补充一个新线程。
newScheduledThreadPool:创建固定长度的线程池,以延时或定时的方式来执行任务。
下面是Executor和ExecutorService中常用方法的示例:
import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; public class Demo{ public static void main(String [] args){ //--------Executor示例------------// Executor s=Executors.newSingleThreadExecutor(); s.execute(new MyRunnableTask("1")); //--------ExecutorService示例------------// ExecutorService es=Executors.newFixedThreadPool(2); //--------get()示例------------// Future<String> future=es.submit(new MyCallableTask("10")); try{ System.out.println(future.get()); }catch(Exception e){} //--------get(timeout, timeunit)示例------------// future=es.submit(new MyCallableTask("11")); try{ System.out.println(future.get(500,TimeUnit.MILLISECONDS)); }catch(Exception e){ System.out.println("cancle because timeout"); } //--------invokeAll(tasks)示例------------// List<MyCallableTask> myCallableTasks=new ArrayList<MyCallableTask>(); for(int i=0;i<6;i++){ myCallableTasks.add(new MyCallableTask(i+"")); } try { List<Future<String>> results = es.invokeAll(myCallableTasks); Iterator<Future<String>> iterator=results.iterator(); while(iterator.hasNext()){ future=iterator.next(); System.out.println(future.get()); } } catch (Exception e) {} //--------invokeAll(tasks,timeout,timeunit))示例------------// try { //限定执行时间为2100ms,每个任务需要1000ms,线程池的长度为2,因此最多只能处理4个任务。一共6个任务,有2个任务会被取消。 List<Future<String>> results = es.invokeAll(myCallableTasks,2100,TimeUnit.MILLISECONDS); Iterator<Future<String>> iterator=results.iterator(); while(iterator.hasNext()){ future=iterator.next(); if(!future.isCancelled()) System.out.println(future.get()); else System.out.println("cancle because timeout"); } } catch (Exception e) {} es.shutdown(); } } class MyRunnableTask implements Runnable{ private String name; public MyRunnableTask(String name) { this.name=name; } @Override public void run() { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("runnable task--"+name); } } class MyCallableTask implements Callable<String>{ private String name; public MyCallableTask(String name) { this.name=name; } @Override public String call() throws Exception { try { Thread.sleep(1000); } catch (InterruptedException e) {} StringBuilder sb=new StringBuilder("callable task--"); return sb.append(name).toString(); } }
上面的ExecutorSerivce接口中的invokeAll(tasks)方法用于批量执行任务,并且将结果按照task列表中的顺序返回。此外,还存在一个批量执行任务的接口CompletionTask。ExecutorCompletionService是实现CompletionService接口的一个类,该类的实现原理很简单:
用Executor类来执行任务,同时把在执行任务的Future放到BlockingQueue<Future<V>>队列中。该类实现的关键就是重写FutureTask类的done()方法,FutureTask类的done()方法是一个钩子函数(关于钩子函数,请读者自行查询),done()方法在FutureTask任务被执行的时候被调用。
ExecutorCompletionService类的核心代码如下:
public Future<V> submit(Runnable task, V result) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task, result); executor.execute(new QueueingFuture(f)); return f; } private class QueueingFuture extends FutureTask<Void> { QueueingFuture(RunnableFuture<V> task) { super(task, null); this.task = task; } protected void done() { completionQueue.add(task); } private final Future<V> task; }
其中的done()方法定义如下:
/** * Protected method invoked when this task transitions to state * <tt>isDone</tt> (whether normally or via cancellation). The * default implementation does nothing. Subclasses may override * this method to invoke completion callbacks or perform * bookkeeping. Note that you can query status inside the * implementation of this method to determine whether this task * has been cancelled. */ protected void done() { }
ExecutorCompletionService的使用示例如下:
import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class Demo{ public static void main(String [] args) throws InterruptedException, ExecutionException{ CompletionService<String> cs=new ExecutorCompletionService<String>( Executors.newFixedThreadPool(2)); for(int i=0;i<6;i++){ cs.submit(new MyCallableTask(i+"")); } for(int i=0;i<6;i++){ Future<String> future=cs.take(); //Retrieves and removes the Future representing the next completed task, //waiting if none are yet present. System.out.println(future.get()); } } } class MyCallableTask implements Callable<String>{ private String name; public MyCallableTask(String name) { this.name=name; } @Override public String call() throws Exception { try { Thread.sleep(1000); } catch (InterruptedException e) {} StringBuilder sb=new StringBuilder("callable task--"); return sb.append(name).toString(); } }
相关文章推荐
- Java Executor并发框架(七)Executor框架线程池任务执行全过程(下)
- Java Executor并发框架(六)Executor框架线程池任务执行全过程(上)
- java并发编程--Executor框架(一) 摘要: Eexecutor作为灵活且强大的异步执行框架,其支持多种不同类型的任务执行策略,提供了一种标准的方法将任务的提交过程和执行过程
- Java并发专题 带返回结果的批量任务执行 CompletionService ExecutorService.invokeAll
- Java并发专题 带返回结果的批量任务执行 CompletionService ExecutorService.invokeAll(转)
- Java并发专题 带返回结果的批量任务执行 CompletionService ExecutorService.invokeAll
- Java并发专题 带返回结果的批量任务执行 CompletionService ExecutorService.invokeAll
- Java并发编程:ScheduledExecutorService执行周期任务
- Java并发专题 带返回结果的批量任务执行 CompletionService ExecutorService.invokeAll
- Java并发专题 带返回结果的批量任务执行 CompletionService ExecutorService.invokeAll
- Java Executor并发框架(九)Executor框架线程池ExecutorService.shutdown什么时候执行
- Java并发专题 带返回结果的批量任务执行 CompletionService ExecutorService.invokeAll
- java并发基础(三)--- 任务执行(Executor的使用)
- Java并发专题 带返回结果的批量任务执行 CompletionService ExecutorService.invokeAll
- Executor框架是指java 5中引入的一系列并发库中与executor相关的一些功能类,其中包括线程池,Executor,Executors,ExecutorService,Completion
- Java Executor并发框架
- Java并发编程之线程管理(Executor框架12)
- java多线程并发executorservice(任务调度)类
- java并发编程-Executor框架
- Java并发之任务的描述和执行