Java 并发编程之任务取消(七)
2014-09-01 22:55
288 查看
只运行一次的服务
以检查是否有新邮件为栗采用AtomicBoolean而不使用Volatile类型的boolean.是因为能从内部的Runnable中访问hasNewMail标志,因此它必须是final类型以免被修改。
boolean checkMain(Set<String> hosts, long timeout, TimeUnit unit) throws InterruptedException { ExecutorService exec = Executors.newCachedThreadPool(); final AtomicBoolean hasNewMail = new AtomicBoolean(false); try { for (String host : hosts) { exec.execute(new Runnable() { @Override public void run() { // TODO Auto-generated method stub if (checkMail(host)) { hasNewMail.set(true); } } }); } } finally { exec.shutdown(); exec.awaitTermination(timeout, unit); } return hasNewMail.get(); }
ShutdownNow的局限性
上次说shutdownNow会返回所有尚未启动的Runnable。但是它无法返回的是正在执行的Runnable。通过封装ExecutorService并使得execute记录哪些任务是在关闭后取消的。并且在任务返回时必须保持线程的中断状态import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.AbstractExecutorService; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; public class TrackingExecutor extends AbstractExecutorService { private final ExecutorService exec; private final Set<Runnable> tasksCancelledAtShutdown = Collections .synchronizedSet(new HashSet<Runnable>()); public TrackingExecutor(ExecutorService exec) { super(); this.exec = exec; } public List<Runnable> getCancelledTasks() { if (!exec.isTerminated()) { throw new IllegalStateException("terminated"); } return new ArrayList<Runnable>(tasksCancelledAtShutdown); } @Override public void execute(final Runnable command) { // TODO Auto-generated method stub exec.execute(new Runnable() { public void run() { try { command.run(); } finally { if (isShutdown() && Thread.currentThread().isInterrupted()) { tasksCancelledAtShutdown.add(command); } } } }); } @Override public void shutdown() { // TODO Auto-generated method stub } @Override public List<Runnable> shutdownNow() { // TODO Auto-generated method stub return null; } @Override public boolean isShutdown() { // TODO Auto-generated method stub return false; } @Override public boolean isTerminated() { // TODO Auto-generated method stub return false; } @Override public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { // TODO Auto-generated method stub return false; } }
将ExecutorService的其他方法委托给exec。以网页爬虫为例。使用TrackingExecutor的用法
import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public abstract class WebCrawler { private volatile TrackingExecutor exec; private final Set<URL> urltoCrawl = new HashSet<URL>(); public static void main(String[] args) { WebCrawler webc = new WebCrawler() { @Override protected List<URL> processPage(URL url) { System.out.println(url.getHost()); List<URL> url2 = new ArrayList<URL>(); try { url2.add(new URL("http://www.baidu.com")); } catch (MalformedURLException e) { // TODO Auto-generated catch block e.printStackTrace(); } return url2; } }; try { webc.urltoCrawl.add(new URL("http://www.baidu.com")); } catch (MalformedURLException e) { // TODO Auto-generated catch block e.printStackTrace(); } webc.start(); try { Thread.sleep(1000); webc.stop(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public synchronized void start() { exec = new TrackingExecutor(Executors.newCachedThreadPool()); for (URL url : urltoCrawl) { submitCrawlTask(url); } urltoCrawl.clear(); } public synchronized void stop() throws InterruptedException { try { saveUncrawled(exec.shutdownNow()); if (exec.awaitTermination(2000, TimeUnit.MILLISECONDS)) { saveUncrawled(exec.getCancelledTasks()); } } finally { exec = null; } } protected abstract List<URL> processPage(URL url); private void saveUncrawled(List<Runnable> uncrawled) { for (Runnable runnable : uncrawled) { urltoCrawl.add(((CrawlTask) runnable).getPage()); } } private void submitCrawlTask(URL u) { exec.execute(new CrawlTask(u)); } private class CrawlTask implements Runnable { private final URL url; public CrawlTask(URL url) { super(); this.url = url; } @Override public void run() { // TODO Auto-generated method stub for (URL link : processPage(url)) { if (Thread.currentThread().isInterrupted()) return; submitCrawlTask(link); } } public URL getPage() { return url; } } }上面程序中抽象方法processPage中的代码是测试用的。里面正确的代码目的是获取该url下所有的链接。其实也不难。但是不属于本篇知识点内。只要得到所有<a href就可以了。有兴趣的可以看看元素选择器 还有一个开源的爬虫Crawl研究一下。
执行Stop方法后会调用saveUncrawle方法获取正在执行中的任务。之后要进行保存到文件中还是怎么办只要把代码添加到saveUncrawled内即可。
相关文章推荐
- Java 并发编程之任务取消(六)
- Java 并发编程之任务取消(五)
- Java 并发编程之任务取消
- Java 并发编程之任务取消 (二)
- Java 并发编程之任务取消(八)
- Java 并发编程之任务取消(九)
- Java并发编程-20-在执行器中取消任务和控制任务的完成
- Java 并发编程之任务取消(四)
- java并发编程,通过Future取消任务
- Java并发编程-27-异常处理及取消任务
- Java 并发编程之任务取消 (三)
- 并发编程 10—— 任务取消 之 关闭 ExecutorService
- Java 并发编程实战学习笔记——路径查找类型并行任务的终止
- Java并发编程-15-并发任务间数据交换
- 并发编程 12—— 任务取消与关闭 之 shutdownNow 的局限性
- Java高并发编程——为IO密集型应用设计线程数与划分任务
- Java并发编程-26-异步运行任务
- Java并发编程-21-在执行器中分离任务的启动与结果的处理
- Java高并发编程——为IO密集型应用设计线程数与划分任务
- Java并发编程-19-在执行器中延时执行任务和周期性执行任务