您的位置:首页 > 编程语言 > Java开发

Executor框架相关类及原理

2017-06-06 13:21 113 查看
 在并发编程中,经常会用到Executor这个框架,Executor就是Runnable和Callable的调度容器,今天我就来总结一下几个常用的类及相关原理。

Runnable

这个在多线程里几乎无所不在,连Thread类都是它的实现类。

public
interface Runnable {
/**
* When an object implementing interface <code>Runnable</code> is used
* to create a thread, starting the thread causes the object's
* <code>run</code> method to be called in that separately executing
* thread.
* <p>
* The general contract of the method <code>run</code> is that it may
* take any action whatsoever.
*
* @see     java.lang.Thread#run()
*/
public abstract void run();
}


Callable

Callable与Runnable的功能大致相似,Callable中有一个call()函数,但是call()函数有返回值,而Runnable的run()函数不能将结果返回给客户程序。

public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}


Future

Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果、设置结果操作get方法会阻塞,直到任务返回结果

public interface Future<V> {

/**
* Attempts to cancel execution of this task.  This attempt will
* fail if the task has already completed, has already been cancelled,
* or could not be cancelled for some other reason. If successful,
* and this task has not started when <tt>cancel</tt> is called,
* this task should never run.  If the task has already started,
* then the <tt>mayInterruptIfRunning</tt> parameter determines
* whether the thread executing this task should be interrupted in
* an attempt to stop the task.
*
* <p>After this method returns, subsequent calls to {@link #isDone} will
* always return <tt>true</tt>.  Subsequent calls to {@link #isCancelled}
* will always return <tt>true</tt> if this method returned <tt>true</tt>.
*
* @param mayInterruptIfRunning <tt>true</tt> if the thread executing this
* task should be interrupted; otherwise, in-progress tasks are allowed
* to complete
* @return <tt>false</tt> if the task could not be cancelled,
* typically because it has already completed normally;
* <tt>true</tt> otherwise
*/
boolean cancel(boolean mayInterruptIfRunning);

/**
* Returns <tt>true</tt> if this task was cancelled before it completed
* normally.
*
* @return <tt>true</tt> if this task was cancelled before it completed
*/
boolean isCancelled();

/**
* Returns <tt>true</tt> if this task completed.
*
* Completion may be due to normal termination, an exception, or
* cancellation -- in all of these cases, this method will return
* <tt>true</tt>.
*
* @return <tt>true</tt> if this task completed
*/
boolean isDone();

/**
* Waits if necessary for the computation to complete, and then
* retrieves its result.
*
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread was interrupted
* while waiting
*/
V get() throws InterruptedException, ExecutionException;

/**
* Waits if necessary for at most the given time for the computation
* to complete, and then retrieves its result, if available.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread was interrupted
* while waiting
* @throws TimeoutException if the wait timed out
*/
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}


ThreadPoolExecutor

关于这个类我们简单看一下它的构造函数

/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
*        if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
*        pool
* @param keepAliveTime when the number of threads is greater than
*        the core, this is the maximum time that excess idle threads
*        will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
*        executed.  This queue will hold only the {@code Runnable}
*        tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
*        creates a new thread
* @param handler the handler to use when execution is blocked
*        because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
*         {@code corePoolSize < 0}<br>
*         {@code keepAliveTime < 0}<br>
*         {@code maximumPoolSize <= 0}<br>
*         {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
*         or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

大多数参数都可以一目了然,那我们就来看看BlockingQueue。
它是一个接口,有以下几种常用的实现类:
LinkedBlockingQueue,ArrayBlockingQueue,SynchronousQueue,DelayedWorkQueue。
前面两种类比LinkedList和ArrayList,就是多了阻塞功能。 
 SynchronousQueue是无界的,是一种无缓冲的等待队列,但是由于该Queue本身的特性,在某次添加元素后必须等待其他线程取走后才能继续添加

DelayedWorkQueue则多了延迟功能,只有到了一定时间才能取走队列里的内容。

一般来说,不同种类的线程池传递不同种类的阻塞队列。

/**
* Method invoked prior to executing the given Runnable in the
* given thread.  This method is invoked by thread {@code t} that
* will execute task {@code r}, and may be used to re-initialize
* ThreadLocals, or to perform logging.
*
* <p>This implementation does nothing, but may be customized in
* subclasses. Note: To properly nest multiple overridings, subclasses
* should generally invoke {@code super.beforeExecute} at the end of
* this method.
*
* @param t the thread that will run task {@code r}
* @param r the task that will be executed
*/
protected void beforeExecute(Thread t, Runnable r) { }

/**
* Method invoked upon completion of execution of the given Runnable.
* This method is invoked by the thread that executed the task. If
* non-null, the Throwable is the uncaught {@code RuntimeException}
* or {@code Error} that caused execution to terminate abruptly.
*
* <p>This implementation does nothing, but may be customized in
* subclasses. Note: To properly nest multiple overridings, subclasses
* should generally invoke {@code super.afterExecute} at the
* beginning of this method.
*
* <p><b>Note:</b> When actions are enclosed in tasks (such as
* {@link FutureTask}) either explicitly or via methods such as
* {@code submit}, these task objects catch and maintain
* computational exceptions, and so they do not cause abrupt
* termination, and the internal exceptions are <em>not</em>
* passed to this method. If you would like to trap both kinds of
* failures in this method, you can further probe for such cases,
* as in this sample subclass that prints either the direct cause
* or the underlying exception if a task has been aborted:
*
*  <pre> {@code
* class ExtendedExecutor extends ThreadPoolExecutor {
*   // ...
*   protected void afterExecute(Runnable r, Throwable t) {
*     super.afterExecute(r, t);
*     if (t == null && r instanceof Future<?>) {
*       try {
*         Object result = ((Future<?>) r).get();
*       } catch (CancellationException ce) {
*           t = ce;
*       } catch (ExecutionException ee) {
*           t = ee.getCause();
*       } catch (InterruptedException ie) {
*           Thread.currentThread().interrupt(); // ignore/reset
*       }
*     }
*     if (t != null)
*       System.out.println(t);
*   }
* }}</pre>
*
* @param r the runnable that has completed
* @param t the exception that caused termination, or null if
* execution completed normally
*/
protected void afterExecute(Runnable r, Throwable t) { }

覆盖这两个方法可以增强线程操作。

Executors

这个类和Collections类类似,封装了一些列常用又好用的方法。举个栗子,它可以提供几种常用的线程池。

/**
* Creates a thread pool that reuses a fixed number of threads
* operating off a shared unbounded queue, using the provided
* ThreadFactory to create new threads when needed.  At any point,
* at most <tt>nThreads</tt> threads will be active processing
* tasks.  If additional tasks are submitted when all threads are
* active, they will wait in the queue until a thread is
* available.  If any thread terminates due to a failure during
* execution prior to shutdown, a new one will take its place if
* needed to execute subsequent tasks.  The threads in the pool will
* exist until it is explicitly {@link ExecutorService#shutdown
* shutdown}.
*
* @param nThreads the number of threads in the pool
* @param threadFactory the factory to use when creating new threads
* @return the newly created thread pool
* @throws NullPointerException if threadFactory is null
* @throws IllegalArgumentException if {@code nThreads <= 0}
*/
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}


/**
* Creates a thread pool that creates new threads as needed, but
* will reuse previously constructed threads when they are
* available.  These pools will typically improve the performance
* of programs that execute many short-lived asynchronous tasks.
* Calls to <tt>execute</tt> will reuse previously constructed
* threads if available. If no existing thread is available, a new
* thread will be created and added to the pool. Threads that have
* not been used for sixty seconds are terminated and removed from
* the cache. Thus, a pool that remains idle for long enough will
* not consume any resources. Note that pools with similar
* properties but different details (for example, timeout parameters)
* may be created using {@link ThreadPoolExecutor} constructors.
*
* @return the newly created thread pool
*/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}


/**
* Creates an Executor that uses a single worker thread operating
* off an unbounded queue. (Note however that if this single
* thread terminates due to a failure during execution prior to
* shutdown, a new one will take its place if needed to execute
* subsequent tasks.)  Tasks are guaranteed to execute
* sequentially, and no more than one task will be active at any
* given time. Unlike the otherwise equivalent
* <tt>newFixedThreadPool(1)</tt> the returned executor is
* guaranteed not to be reconfigurable to use additional threads.
*
* @return the newly created single-threaded Executor
*/
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}


/**
* Creates a thread pool that can schedule commands to run after a
* given delay, or to execute periodically.
* @param corePoolSize the number of threads to keep in the pool,
* even if they are idle.
* @return a newly created scheduled thread pool
* @throws IllegalArgumentException if {@code corePoolSize < 0}
*/
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}


FutureTask

public class FutureTask<V> implements RunnableFuture<V>

public interface RunnableFuture<V> extends Runnable, Future<V>

根据上面的代码我们可以发现FutureTask实现了Runnable和Future。

/**
* Creates a {@code FutureTask} that will, upon running, execute the
* given {@code Callable}.
*
* @param  callable the callable task
* @throws NullPointerException if the callable is null
*/
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW;       // ensure visibility of callable
}

/**
* Creates a {@code FutureTask} that will, upon running, execute the
* given {@code Runnable}, and arrange that {@code get} will return the
* given result on successful completion.
*
* @param runnable the runnable task
* @param result the result to return on successful completion. If
* you don't need a particular result, consider using
* constructions of the form:
* {@code Future<?> f = new FutureTask<Void>(runnable, null)}
* @throws NullPointerException if the runnable is null
*/
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW;       // ensure visibility of callable
}

它有两个构造函数,传Callable和Runnable都行,但最终都转换成Callable,这个转换用到了适配器模式。

从它实现的接口我们就能猜出他的一些特性,他需要启动线程执行,并且可以返回执行结果。我们一般会弄一个线程池,将FutureTask提交( submit() )给线程池运行。这里说一下execute方法和submit方法的区别,他们可传的参数和可返回的值都不一样。

void execute(Runnable command);


/**
* Submits a value-returning task for execution and returns a
* Future representing the pending results of the task. The
* Future's <tt>get</tt> method will return the task's result upon
* successful completion.
*
* <p>
* If you would like to immediately block waiting
* for a task, you can use constructions of the form
* <tt>result = exec.submit(aCallable).get();</tt>
*
* <p> Note: The {@link Executors} class includes a set of methods
* that can convert some other common closure-like objects,
* for example, {@link java.security.PrivilegedAction} to
* {@link Callable} form so they can be submitted.
*
* @param task the task to submit
* @return a Future representing pending completion of the task
* @throws RejectedExecutionException if the task cannot be
*         scheduled for execution
* @throws NullPointerException if the task is null
*/
<T> Future<T> submit(Callable<T> task);

/**
* Submits a Runnable task for execution and returns a Future
* representing that task. The Future's <tt>get</tt> method will
* return the given result upon successful completion.
*
* @param task the task to submit
* @param result the result to return
* @return a Future representing pending completion of the task
* @throws RejectedExecutionException if the task cannot be
*         scheduled for execution
* @throws NullPointerException if the task is null
*/
<T> Future<T> submit(Runnable task, T result);

/**
* Submits a Runnable task for execution and returns a Future
* representing that task. The Future's <tt>get</tt> method will
* return <tt>null</tt> upon <em>successful</em> completion.
*
* @param task the task to submit
* @return a Future representing pending completion of the task
* @throws RejectedExecutionException if the task cannot be
*         scheduled for execution
* @throws NullPointerException if the task is null
*/
Future<?> submit(Runnable task);


CompletionService

如果向Executor提交了一组计算任务,并且希望在计算完成后获得结果,那么可以保留与每个任务关联的Future,然后反复使用get方法,同时将参数timeout指定为0,从而通过轮询来判断任务是否完成。这种方法虽然可行,但却有些繁琐。幸运的是,还有一种更好的方法:完成服务CompletionService。

先看它的submit()方法,可以传Runnable和Callable两种对象。

/**
* Submits a value-returning task for execution and returns a Future
* representing the pending results of the task.  Upon completion,
* this task may be taken or polled.
*
* @param task the task to submit
* @return a Future representing pending completion of the task
* @throws RejectedExecutionException if the task cannot be
*         scheduled for execution
* @throws NullPointerException if the task is null
*/
Future<V> submit(Callable<V> task);

/**
* Submits a Runnable task for execution and returns a Future
* representing that task.  Upon completion, this task may be
* taken or polled.
*
* @param task the task to submit
* @param result the result to return upon successful completion
* @return a Future representing pending completion of the task,
*         and whose <tt>get()</tt> method will return the given
*         result value upon completion
* @throws RejectedExecutionException if the task cannot be
*         scheduled for execution
* @throws NullPointerException if the task is null
*/
Future<V> submit(Runnable task, V result);

它本身是一个接口,它的一个比较常用的实现类是ExecutorCompletionService,它的构造函数需要传入Executor对象,一般传入线程池。

他还有两个方法,take(),poll(),take()是阻塞的。

/**
* Retrieves and removes the Future representing the next
* completed task, waiting if none are yet present.
*
* @return the Future representing the next completed task
* @throws InterruptedException if interrupted while waiting
*/
Future<V> take() throws InterruptedException;

/**
* Retrieves and removes the Future representing the next
* completed task or <tt>null</tt> if none are present.
*
* @return the Future representing the next completed task, or
*         <tt>null</tt> if none are present
*/
Future<V> poll();

这两个方法用来获得Future,并用Future来获取返回的执行结果。

CompletionService总结

1.
自己创建一个集合来保存Future存根并循环调用其返回结果的时候,主线程并不能保证首先获得的是最先完成任务的线程返回值。它只是按加入线程池的顺序返回。因为take方法是阻塞方法,后面的任务完成了,前面的任务却没有完成,主程序就那样等待在那儿,只到前面的完成了,它才知道原来后面的也完成了。

2.使用CompletionService来维护处理线程的返回结果时,主线程总是能够拿到最先完成的任务的返回值,而不管它们加入线程池的顺序。

3.CompletionService的实现是维护了一个保存Future的BlockingQueque。只有当这个Future的任务状态是结束的时候,才会加入到这个Queque中,take()方法其实就是Producer-Consumer中的Consumer。它会从Queue中取出Future对象,如果Queue是空的,就会阻塞在那里,直到有完成的Future对象加入到Queue中。也就是先完成的必定先被取出,这样就减少了不必要的等待时间。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息