您的位置:首页 > 其它

动态控制线程池中正在运行的任务

2017-05-14 21:49 197 查看
看起来很容易,也确实很容易,但是要不执行多余代码就很麻烦了。

先说说简单实现吧,我们在添加任务时可以利用List<Runnable>记录下来运行了的任务;

注意这个Runnable必须自定义并且在里面添加停止任务的方法,一般可以在执行步骤里面或者前面判断是否继续运行,例如

public class MyRunnbale implements Runnable{

public volatile boolean exit=false;

Override

void run(){

if(exit)

return;

......//进行判断

}


}

当需要退出任务时获取需要停止的任务直接令任务的exit=true;

这样就能退出了;

一般可以在

继承DiscardOldestPolicy类重写
public void rejectedExecution(Runnable r, ThreadPoolExecutor e)方法;
可以在这里面传递静态变量使用for进行关闭任务;
如果想要更清楚的控制正在运行的任务则需要基于ThreadPoolExecutor做改造了,改造成自己想要的线程池;
改造前需要拿到源码修改包名等,这些我都做好了你们可以直接下载就能用,下载后可以将源码作为自己的类使用;因此也可以进行修改等等;
一般在下面的方法中可以动态管理正在执行的任务
public volatile List<Runnable> mRunningTasks=new LinkedList<>();// TODO: 2017/5/14 正在运行中的任务
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted.  This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);Throwable thrown = null;
try {
mRunningTasks.add(task);//在worker执行任务前添加任务到正在运行的任务的列表;
//System.out.println("添加了任务");
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {mRunningTasks.remove(task);//如果任务执行完了就移除
task = null;
w.completedTasks++;                 w.unlock();
}
}
completedAbruptly = [b]false;  } finally {
processWorkerExit(w, completedAbruptly);}
}
然后在添加一个处理退出这些任务的方法
例如:
public  void tryStopRunningTasks(){
ControllableRunnable controllableRunnable;
Runnable runnable;
try {
for (int i=0;i< mRunningTasks.size();i++ ){
if (i>=mRunningTasks.size()-1)
break;
runnable=mRunningTasks.get(i);
controllableRunnable=(ControllableRunnable)runnable;
if (controllableRunnable==null||controllableRunnable.exit==true)
continue;
controllableRunnable.exit=true;
}
} catch (NullPointerException e1) {

}
}
最后重写
public void rejectedExecution(Runnable r, ThreadPoolExecutor e)方法
例如当任务缓存队列满后调用:
public void rejectedExecution(Runnable r, ControllableThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();// TODO: 2017/5/10 移除最早的没有执行的任务
e.tryStopRunningTasks();e.executeControllableRunnable((ControllableRunnable) r);}
}
这样如果线程池的任务缓存队列满后(意味着线程池所有的线程全部阻塞,这时如果不结束正在运行的任务就会阻塞之后所有任务)就能够畅通线程池,防止阻塞发生。
ThreadPoolExecutor.java 和
RejectedExecutionHandler.java 的
源码在下面注意这两个必须一起否则会报错
如果出现包名错误自行修改

下载地址 源码下载地址
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
相关文章推荐