您的位置:首页 > 其它


2014-06-08 13:09 323 查看



* The lazily constructed LazyThreadPool instance.
private LazyThreadPool lazyThreadPool;


* Shut down the {@link ThreadPool}. After this returns
* {@link ThreadPool#submit(TimedCancelable)} will return null.
* @param interrupt {@code true} if the threads executing tasks task should
*        be interrupted; otherwise, in-progress tasks are allowed to complete
*        normally.
* @param waitMillis maximum amount of time to wait for tasks to complete.
* @return {@code true} if all the running tasks terminated and
*         {@code false} if the some running task did not terminate.
* @throws InterruptedException if interrupted while waiting.
synchronized boolean shutdown(boolean interrupt, long waitMillis)
throws InterruptedException {
isShutdown = true;
if (lazyThreadPool == null) {
return true;
} else {
return lazyThreadPool.shutdown(interrupt, waitMillis);

* Return a LazyThreadPool.
private synchronized LazyThreadPool getInstance() {
if (lazyThreadPool == null) {
lazyThreadPool = new LazyThreadPool();
return lazyThreadPool;


* Submit a {@link Cancelable} for execution and return a
* {@link TaskHandle} for the running task or null if the task has not been
* accepted. After {@link ThreadPool#shutdown(boolean, long)} returns this
* will always return null.
public TaskHandle submit(Cancelable cancelable) {
if (isShutdown) {
return null;
if (cancelable instanceof TimedCancelable && maximumTaskLifeMillis != 0L) {
return getInstance().submit((TimedCancelable) cancelable);
} else {
return getInstance().submit(cancelable);



* 提交任务2 获得操作句柄
* Submit a {@link Cancelable} for execution and return a
* {@link TaskHandle} for the running task or null if the task has not been
* accepted. After {@link LazyThreadPool#shutdown(boolean, long)} returns
* this will always return null.
TaskHandle submit(Cancelable cancelable) {
try {
// taskFuture is used to cancel 'cancelable' and to determine if
// 'cancelable' is done.
Future<?> taskFuture = completionService.submit(cancelable, null);
return new TaskHandle(cancelable, taskFuture, clock.getTimeMillis());
} catch (RejectedExecutionException re) {
if (!executor.isShutdown()) {
LOGGER.log(Level.SEVERE, "Unable to execute task", re);
return null;






* 提交任务1
* Submit a {@link TimedCancelable} for execution and return a
* {@link TaskHandle} for the running task or null if the task has not been
* accepted. After {@link LazyThreadPool#shutdown(boolean, long)} returns
* this will always return null.
TaskHandle submit(TimedCancelable cancelable) {
try {
// When timeoutTask is run it will cancel 'cancelable'.
TimeoutTask timeoutTask = new TimeoutTask(cancelable);

// Schedule timeoutTask to run when 'cancelable's maximum run interval
// has expired.
// timeoutFuture will be used to cancel timeoutTask when 'cancelable'
// completes.
Future<?> timeoutFuture = timeoutService.schedule(timeoutTask,
maximumTaskLifeMillis, TimeUnit.MILLISECONDS);

// cancelTimeoutRunnable runs 'cancelable'. When 'cancelable' completes
// cancelTimeoutRunnable cancels 'timeoutTask'. This saves system
// resources. In addition it prevents timeout task from running and
// calling cancel after 'cancelable' completes successfully.
CancelTimeoutRunnable cancelTimeoutRunnable =
new CancelTimeoutRunnable(cancelable, timeoutFuture);

// taskFuture is used to cancel 'cancelable' and to determine if
// 'cancelable' is done.
Future<?> taskFuture =
completionService.submit(cancelTimeoutRunnable, null);
TaskHandle handle =
new TaskHandle(cancelable, taskFuture, clock.getTimeMillis());

// TODO(strellis): test/handle timer pop/cancel before submit. In
// production with a 30 minute timeout this should never happen.
return handle;
} catch (RejectedExecutionException re) {
if (!executor.isShutdown()) {
LOGGER.log(Level.SEVERE, "Unable to execute task", re);
return null;


* 静态内部类  检测线程超时
* A task that cancels another task that is running a {@link TimedCancelable}.
* The {@link TimeoutTask} should be scheduled to run when the interval for
* the {@link TimedCancelable} to run expires.
private static class TimeoutTask implements Runnable {
final TimedCancelable timedCancelable;
private volatile TaskHandle taskHandle;

TimeoutTask(TimedCancelable timedCancelable) {
this.timedCancelable = timedCancelable;

public void run() {
if (taskHandle != null) {

void setTaskHandle(TaskHandle taskHandle) {
this.taskHandle = taskHandle;

然后延迟执行该线程,获得Future<?> timeoutFuture线程句柄

// Schedule timeoutTask to run when 'cancelable's maximum run interval
// has expired.
// timeoutFuture will be used to cancel timeoutTask when 'cancelable'
// completes.
Future<?> timeoutFuture = timeoutService.schedule(timeoutTask,
maximumTaskLifeMillis, TimeUnit.MILLISECONDS);

然后构造CancelTimeoutRunnable对象,传入TimedCancelable类型对象和Future<?> timeoutFuture线程句柄

// cancelTimeoutRunnable runs 'cancelable'. When 'cancelable' completes
// cancelTimeoutRunnable cancels 'timeoutTask'. This saves system
// resources. In addition it prevents timeout task from running and
// calling cancel after 'cancelable' completes successfully.
CancelTimeoutRunnable cancelTimeoutRunnable =
new CancelTimeoutRunnable(cancelable, timeoutFuture);


* 内部类LazyThreadPool的内部类1
* 执行TimedCancelable cancelable的run方法
* 执行完毕后取消超时线程
* A {@link Runnable} for running {@link TimedCancelable} that has been
* guarded by a timeout task. This will cancel the timeout task when the
* {@link TimedCancelable} completes. If the timeout task has already run,
* then canceling it has no effect.
private class CancelTimeoutRunnable implements Runnable {
private final Future<?> timeoutFuture;
private final TimedCancelable cancelable;

* Constructs a {@link CancelTimeoutRunnable}.
* @param cancelable the {@link TimedCancelable} this runs.
* @param timeoutFuture the {@link Future} for canceling the timeout task.
CancelTimeoutRunnable(TimedCancelable cancelable, Future<?> timeoutFuture) {
this.timeoutFuture = timeoutFuture;
this.cancelable = cancelable;

public void run() {
try {
} finally {



// taskFuture is used to cancel 'cancelable' and to determine if
// 'cancelable' is done.
Future<?> taskFuture =
completionService.submit(cancelTimeoutRunnable, null);
TaskHandle handle =
new TaskHandle(cancelable, taskFuture, clock.getTimeMillis());

// TODO(strellis): test/handle timer pop/cancel before submit. In
// production with a 30 minute timeout this should never happen.


* 关闭线程池任务
* Shut down the LazyThreadPool.
* @param interrupt {@code true} if the threads executing tasks task should
*        be interrupted; otherwise, in-progress tasks are allowed to
*        complete normally.
* @param waitMillis maximum amount of time to wait for tasks to complete.
* @return {@code true} if all the running tasks terminated, or
*         {@code false} if some running task did not terminate.
* @throws InterruptedException if interrupted while waiting.
boolean shutdown(boolean interrupt, long waitMillis)
throws InterruptedException {
if (interrupt) {
} else {
if (timeoutService != null) {
try {
return executor.awaitTermination(waitMillis, TimeUnit.MILLISECONDS);
} finally {
if (timeoutService != null) {


* 内部类LazyThreadPool的内部类2
* 获取结果线程
* A task that gets completion information from all the tasks that run in a
* {@link CompletionService} and logs uncaught exceptions that cause the
* tasks to fail.
private class CompletionTask implements Runnable {
private void completeTask() throws InterruptedException {
Future<?> future = completionService.take();
try {
} catch (CancellationException e) {
LOGGER.info("Batch terminated due to cancellation.");
} catch (ExecutionException e) {
Throwable cause = e.getCause();
// TODO(strellis): Should we call cancelable.cancel() if we get an
// exception?
if (cause instanceof InterruptedException) {
LOGGER.log(Level.INFO, "Batch terminated due to an interrupt.",
} else {
LOGGER.log(Level.SEVERE, "Batch failed with unhandled exception: ",

public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
} catch (InterruptedException ie) {
LOGGER.info("Completion task shutdown.");


* A {@link ThreadFactory} that adds a prefix to thread names assigned
* by {@link Executors#defaultThreadFactory()} to provide diagnostic
* context in stack traces.
private static class ThreadNamingThreadFactory implements ThreadFactory {
private final ThreadFactory delegate = Executors.defaultThreadFactory();
private final String namePrefix;

ThreadNamingThreadFactory(String namePrefix) {
this.namePrefix = namePrefix + "-";

public Thread newThread(Runnable r) {
Thread t = delegate.newThread(r);
t.setName(namePrefix + t.getName());
return t;



转载请注明出处 博客园 刺猬的温驯

本人邮箱: chenying998179@163#com (#改为.)

本文链接 /article/4693853.html
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息