您的位置:首页 > 编程语言 > Java开发

Java 并发编程之任务取消(七)

2014-09-01 22:55 288 查看

只运行一次的服务

以检查是否有新邮件为栗

采用AtomicBoolean而不使用Volatile类型的boolean.是因为能从内部的Runnable中访问hasNewMail标志,因此它必须是final类型以免被修改。

boolean checkMain(Set<String> hosts, long timeout, TimeUnit unit) throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
final AtomicBoolean hasNewMail = new AtomicBoolean(false);
try {
for (String host : hosts) {
exec.execute(new Runnable() {

@Override
public void run() {
// TODO Auto-generated method stub
if (checkMail(host)) {
hasNewMail.set(true);
}
}
});
}
} finally {
exec.shutdown();
exec.awaitTermination(timeout, unit);
}
return hasNewMail.get();
}

ShutdownNow的局限性

上次说shutdownNow会返回所有尚未启动的Runnable。但是它无法返回的是正在执行的Runnable。通过封装ExecutorService并使得execute记录哪些任务是在关闭后取消的。并且在任务返回时必须保持线程的中断状态

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

public class TrackingExecutor extends AbstractExecutorService {
private final ExecutorService exec;
private final Set<Runnable> tasksCancelledAtShutdown = Collections
.synchronizedSet(new HashSet<Runnable>());

public TrackingExecutor(ExecutorService exec) {
super();
this.exec = exec;
}

public List<Runnable> getCancelledTasks() {
if (!exec.isTerminated()) {
throw new IllegalStateException("terminated");
}
return new ArrayList<Runnable>(tasksCancelledAtShutdown);
}

@Override
public void execute(final Runnable command) {
// TODO Auto-generated method stub
exec.execute(new Runnable() {
public void run() {
try {
command.run();
} finally {
if (isShutdown() && Thread.currentThread().isInterrupted()) {
tasksCancelledAtShutdown.add(command);
}
}
}
});

}

@Override
public void shutdown() {
// TODO Auto-generated method stub

}

@Override
public List<Runnable> shutdownNow() {
// TODO Auto-generated method stub
return null;
}

@Override
public boolean isShutdown() {
// TODO Auto-generated method stub
return false;
}

@Override
public boolean isTerminated() {
// TODO Auto-generated method stub
return false;
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
// TODO Auto-generated method stub
return false;
}

}


将ExecutorService的其他方法委托给exec。以网页爬虫为例。使用TrackingExecutor的用法

import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public abstract class WebCrawler {
private volatile TrackingExecutor exec;
private final Set<URL> urltoCrawl = new HashSet<URL>();

public static void main(String[] args) {
WebCrawler webc = new WebCrawler() {

@Override
protected List<URL> processPage(URL url) {
System.out.println(url.getHost());
List<URL> url2 = new ArrayList<URL>();
try {
url2.add(new URL("http://www.baidu.com"));
} catch (MalformedURLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return url2;
}
};
try {
webc.urltoCrawl.add(new URL("http://www.baidu.com"));
} catch (MalformedURLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
webc.start();

try {
Thread.sleep(1000);
webc.stop();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

public synchronized void start() {
exec = new TrackingExecutor(Executors.newCachedThreadPool());
for (URL url : urltoCrawl) {
submitCrawlTask(url);
}
urltoCrawl.clear();
}

public synchronized void stop() throws InterruptedException {
try {
saveUncrawled(exec.shutdownNow());
if (exec.awaitTermination(2000, TimeUnit.MILLISECONDS)) {
saveUncrawled(exec.getCancelledTasks());
}
} finally {
exec = null;
}
}

protected abstract List<URL> processPage(URL url);

private void saveUncrawled(List<Runnable> uncrawled) {
for (Runnable runnable : uncrawled) {
urltoCrawl.add(((CrawlTask) runnable).getPage());
}
}

private void submitCrawlTask(URL u) {
exec.execute(new CrawlTask(u));
}

private class CrawlTask implements Runnable {
private final URL url;

public CrawlTask(URL url) {
super();
this.url = url;
}

@Override
public void run() {
// TODO Auto-generated method stub
for (URL link : processPage(url)) {
if (Thread.currentThread().isInterrupted())
return;
submitCrawlTask(link);
}
}

public URL getPage() {
return url;
}
}
}
上面程序中抽象方法processPage中的代码是测试用的。里面正确的代码目的是获取该url下所有的链接。其实也不难。但是不属于本篇知识点内。只要得到所有<a href就可以了。有兴趣的可以看看元素选择器 还有一个开源的爬虫Crawl研究一下。

执行Stop方法后会调用saveUncrawle方法获取正在执行中的任务。之后要进行保存到文件中还是怎么办只要把代码添加到saveUncrawled内即可。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  java 多线程 并发