您的位置:首页 > 其它

线程池应用实战

2015-08-26 08:44 211 查看
<pre name="code" class="java">public class ThreadLoadExecuterKeeper implements Runnable {

private static Logger logger = LoggerFactory.getLogger(ThreadLoadExecuterKeeper2.class);

private ThreadPoolExecutor executor = null;
private int threadLoad = Runtime.getRuntime().availableProcessors();
private static int CAPACITY = 20;

private List<JobFinder> finders = new ArrayList<JobFinder>();
private Iterator<JobFinder> jobIt = null;

public ThreadLoadExecuterKeeper2(String thread_load){
init(thread_load);
}

public void init(String thread_load){
if(thread_load!=null && !thread_load.trim().equals("")){
int newThreadLoad = Integer.parseInt(thread_load);
executor = new ThreadPoolExecutor(newThreadLoad, threadLoad*2, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(CAPACITY));
}else{
executor = new ThreadPoolExecutor(threadLoad*2, threadLoad*2, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(CAPACITY));
}
executor.setThreadFactory(new ProcessWorkerThreadFactory("WorkerThreadPool"));
}

public void addJobFinder(JobFinder finder){
if(finder==null){
throw new NullPointerException();
}
finders.add(finder);
jobIt = null;
}

public Job findJob() {
int c = finders.size();
if (c <= 0) return null;

Iterator<JobFinder> jobIt = getJobIterator();
if (jobIt.hasNext()) {
return jobIt.next().findJob();
} else {
return null;
}
}

private Iterator<JobFinder> getJobIterator() {
if (jobIt == null)
jobIt = finders.iterator();
if (!jobIt.hasNext())
jobIt = finders.iterator();
return jobIt;
}

/**
* 迭代线程池中的线程
*/
public void getAlivedThread(){
ProcessWorkerThreadFactory threadFactory = (ProcessWorkerThreadFactory) executor.getThreadFactory();
ConcurrentHashMap<String, Thread> threadPoolMap = threadFactory.threadPoolMap;
for(Map.Entry<String,Thread> entry : threadPoolMap.entrySet()){
String threadName = entry.getKey();
Thread thread = entry.getValue();
System.out.println(">>>>>>>>>thread name is :"+threadName+"========="+thread.getName());
}
}

@Override
public void run() {
Job jobRef = null;
boolean isJobComing = false;
while(true){

if(executor.getActiveCount()==executor.getCorePoolSize()){
continue;
}else if(executor.getActiveCount()==0){
isJobComing = false;
}

System.err.println("正在执行任务的线程个数:"+executor.getActiveCount());

jobRef = findJob();
if(jobRef==null){
try {
if(isJobComing){
Thread.sleep(1000);
}else{
Thread.sleep(3000);
}
} catch (InterruptedException e) {
logger.info("线程sleep异常!");
}
}else{
isJobComing = true;
Worker worker = new Worker();
worker.setJob(jobRef);
Future<Job> future = executor.submit(worker);

JobTimerTask task = new JobTimerTask();
task.setWorker(worker);
task.setFuture(future);
Timer timer = new Timer();
timer.schedule(task, 1000*60*5);
}
}
}

private static class Worker implements Callable<Job>{
private Job jobRef;
private volatile boolean isDone = false;

public synchronized void setJob(Job jobRef){
this.jobRef = jobRef;
}

public boolean getIsDone(){
return this.isDone;
}

@Override
public Job call() throws Exception {
logger.info(Thread.currentThread().getName()+"开始运行。");
if(jobRef!=null){
jobRef.process();
}
isDone = true;
logger.info(Thread.currentThread().getName()+"运行完成。");
return jobRef;
}
}

/**
* 定时任务:用来取消超时的线程
* @author
*
*/
private static class JobTimerTask extends TimerTask{
private Worker worker;
private Future<Job> future;
public void setWorker(Worker worker){
this.worker = worker;
}
public void setFuture(Future<Job> future){
this.future = future;
}
public void run(){
if(worker!=null){
future.cancel(true);
//如果定时任务线程检测到线程池中的任务由于异常而终止,仅仅将此任务id从"making"队列中删除,不需要将它加入到超时队列中
//这种情况下,isDoneByException变量为true,isDone变量为false
if(worker.jobRef.getIsDoneByException()){
worker.jobRef.afterProcess();
}else{
//如果任务执行时间过长,但并没有发生异常,那么除了将此任务id从"making"队列中删除,还把它加入到超时队列中
//这种情况下,isDoneByException变量为false,isDone变量为false
if(!worker.getIsDone()){
worker.jobRef.afterProcess();
DesignCache cache = worker.jobRef.getDesignCache();
Integer designId = worker.jobRef.getDesignId();
if(!worker.jobRef.getFlag().trim().equals("makethumb")){
cache.addTimeoutDesign(designId);
}
}
}
}
}
}

/**
* 自定义线程池工厂
* @author ligx
*
*/
class ProcessWorkerThreadFactory implements ThreadFactory{
public ConcurrentHashMap<String,Thread> threadPoolMap = new ConcurrentHashMap<String, Thread>();
private String poolName = null;

public ProcessWorkerThreadFactory(String poolName){
this.poolName = poolName;
}

@Override
public Thread newThread(Runnable r) {
return new ProcessWorkerThread(r,poolName,threadPoolMap);
}
}

/**
* 自定义线程池中的线程
* @author ligx
*
*/
static class ProcessWorkerThread extends Thread{
private static final AtomicInteger created = new AtomicInteger();
private ConcurrentHashMap<String,Thread> threadPoolMap = null;

public ProcessWorkerThread(Runnable r, String poolName, ConcurrentHashMap<String,Thread> threadPoolMap){
super(r, poolName+"--"+created.incrementAndGet());
this.threadPoolMap = threadPoolMap;
}

public void run(){
<span style="white-space:pre">	</span>super.run();
}
}
}




                                            
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: