线程池应用实战
2015-08-26 08:44
211 查看
<pre name="code" class="java">public class ThreadLoadExecuterKeeper implements Runnable { private static Logger logger = LoggerFactory.getLogger(ThreadLoadExecuterKeeper2.class); private ThreadPoolExecutor executor = null; private int threadLoad = Runtime.getRuntime().availableProcessors(); private static int CAPACITY = 20; private List<JobFinder> finders = new ArrayList<JobFinder>(); private Iterator<JobFinder> jobIt = null; public ThreadLoadExecuterKeeper2(String thread_load){ init(thread_load); } public void init(String thread_load){ if(thread_load!=null && !thread_load.trim().equals("")){ int newThreadLoad = Integer.parseInt(thread_load); executor = new ThreadPoolExecutor(newThreadLoad, threadLoad*2, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(CAPACITY)); }else{ executor = new ThreadPoolExecutor(threadLoad*2, threadLoad*2, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(CAPACITY)); } executor.setThreadFactory(new ProcessWorkerThreadFactory("WorkerThreadPool")); } public void addJobFinder(JobFinder finder){ if(finder==null){ throw new NullPointerException(); } finders.add(finder); jobIt = null; } public Job findJob() { int c = finders.size(); if (c <= 0) return null; Iterator<JobFinder> jobIt = getJobIterator(); if (jobIt.hasNext()) { return jobIt.next().findJob(); } else { return null; } } private Iterator<JobFinder> getJobIterator() { if (jobIt == null) jobIt = finders.iterator(); if (!jobIt.hasNext()) jobIt = finders.iterator(); return jobIt; } /** * 迭代线程池中的线程 */ public void getAlivedThread(){ ProcessWorkerThreadFactory threadFactory = (ProcessWorkerThreadFactory) executor.getThreadFactory(); ConcurrentHashMap<String, Thread> threadPoolMap = threadFactory.threadPoolMap; for(Map.Entry<String,Thread> entry : threadPoolMap.entrySet()){ String threadName = entry.getKey(); Thread thread = entry.getValue(); System.out.println(">>>>>>>>>thread name is :"+threadName+"========="+thread.getName()); } } @Override public void run() { Job jobRef = null; boolean isJobComing = false; while(true){ if(executor.getActiveCount()==executor.getCorePoolSize()){ continue; }else if(executor.getActiveCount()==0){ isJobComing = false; } System.err.println("正在执行任务的线程个数:"+executor.getActiveCount()); jobRef = findJob(); if(jobRef==null){ try { if(isJobComing){ Thread.sleep(1000); }else{ Thread.sleep(3000); } } catch (InterruptedException e) { logger.info("线程sleep异常!"); } }else{ isJobComing = true; Worker worker = new Worker(); worker.setJob(jobRef); Future<Job> future = executor.submit(worker); JobTimerTask task = new JobTimerTask(); task.setWorker(worker); task.setFuture(future); Timer timer = new Timer(); timer.schedule(task, 1000*60*5); } } } private static class Worker implements Callable<Job>{ private Job jobRef; private volatile boolean isDone = false; public synchronized void setJob(Job jobRef){ this.jobRef = jobRef; } public boolean getIsDone(){ return this.isDone; } @Override public Job call() throws Exception { logger.info(Thread.currentThread().getName()+"开始运行。"); if(jobRef!=null){ jobRef.process(); } isDone = true; logger.info(Thread.currentThread().getName()+"运行完成。"); return jobRef; } } /** * 定时任务:用来取消超时的线程 * @author * */ private static class JobTimerTask extends TimerTask{ private Worker worker; private Future<Job> future; public void setWorker(Worker worker){ this.worker = worker; } public void setFuture(Future<Job> future){ this.future = future; } public void run(){ if(worker!=null){ future.cancel(true); //如果定时任务线程检测到线程池中的任务由于异常而终止,仅仅将此任务id从"making"队列中删除,不需要将它加入到超时队列中 //这种情况下,isDoneByException变量为true,isDone变量为false if(worker.jobRef.getIsDoneByException()){ worker.jobRef.afterProcess(); }else{ //如果任务执行时间过长,但并没有发生异常,那么除了将此任务id从"making"队列中删除,还把它加入到超时队列中 //这种情况下,isDoneByException变量为false,isDone变量为false if(!worker.getIsDone()){ worker.jobRef.afterProcess(); DesignCache cache = worker.jobRef.getDesignCache(); Integer designId = worker.jobRef.getDesignId(); if(!worker.jobRef.getFlag().trim().equals("makethumb")){ cache.addTimeoutDesign(designId); } } } } } } /** * 自定义线程池工厂 * @author ligx * */ class ProcessWorkerThreadFactory implements ThreadFactory{ public ConcurrentHashMap<String,Thread> threadPoolMap = new ConcurrentHashMap<String, Thread>(); private String poolName = null; public ProcessWorkerThreadFactory(String poolName){ this.poolName = poolName; } @Override public Thread newThread(Runnable r) { return new ProcessWorkerThread(r,poolName,threadPoolMap); } } /** * 自定义线程池中的线程 * @author ligx * */ static class ProcessWorkerThread extends Thread{ private static final AtomicInteger created = new AtomicInteger(); private ConcurrentHashMap<String,Thread> threadPoolMap = null; public ProcessWorkerThread(Runnable r, String poolName, ConcurrentHashMap<String,Thread> threadPoolMap){ super(r, poolName+"--"+created.incrementAndGet()); this.threadPoolMap = threadPoolMap; } public void run(){ <span style="white-space:pre"> </span>super.run(); } } }
相关文章推荐
- 谈谈对工作的看法
- eclipse时间,log时间,控制台打印时间
- jQuery的deferred对象详解
- MD5 32位加密
- jQuery如何实现点击页面获得当前点击元素
- Python源码分析3 – 词法分析器PyTokenizer http://blog.csdn.net/atfield/article/details/1439068
- Hibernate中Session的操作解释
- Android studio最新版持续更新中
- 容器类的迭代器讨论
- oracle监听和实例启动顺序浅谈
- poj1562&&hdoj1241Oil Deposits
- java之异常处理
- POJ-1019-Number Sequence-组合数学
- 设计模式—那点事儿
- Android 完美实现图片圆角和圆形(对实现进行分析)
- 在一个服务器上搭建好php环境,在服务器上可以访问,其他机器可以访问服务器但网页无法访问
- Linux系统环境下关于多进程并发写同一个文件的讨论
- EBS-如何查看非自己提交的请求的结果
- 黑马程序员——Java 基础 面向对象之多态 (复习)
- 移动APP整体策划-007-App界面交互设计规范