您的位置:首页 > 编程语言 > Java开发

多线程设计模式——Thread Confinement(串行线程封闭)模式

2016-07-12 07:19 393 查看
这些都是根据我最近看的《Java实战指南多线程编程(设计模式篇)》所得整理。

模式名称

Thread Confinement(串行线程封闭)模式

模式解决的问题

如果并发任务的执行涉及某个非线程安全对象,而很多时候我们又不希望因此而引入锁。

解决思路

Thread Confinement(串行线程封闭)模式的核心思想是通过将多个并发的任务存入队列实现任务的串行化,并为这些串行化为任务创建唯一的工作者线程进行处理。

Serializer:负责对外暴露服务接口以及工作者线程的生命管理,并将客户端对其的并发服务调用转换为相应的任务以实现服务串行化。Service表示SerialThreadConfinerment模式对外暴露的服务方法,该方法将客户端对其的并发调用串行化,init初始化SerialThreadConfinerment模式对外暴露的服务,该方法可以启动工作者线程,shutdown停止SerialThreadConfinerment模式对外暴露的服务,该方法可以停止工作者线程。

WokerThrea:工作者线程,负责接收Serializer提交的并发任务以及这些任务的执行。Submit用于Serializer提交并发任务,并将这些任务串行化,dispath用于执行串行化任务。

Queue:Serializer和WorkerThread间的缓冲区,用于实现并发任务的串行化enqueue用于并发任务入队列,dequeue用于任务出队列。

NonThreadSafeObject工作者线程执行任务时所需访问的非线程安全对象。

Created with Raphaël 2.1.0ClientClientSerializerSerializerWorkerThreadWorkerThreadSeralizerSeralizerQueueQueueNonThreadSafeObjectNonThreadSafeObject1init()2create3start()456service()7submit()8enqueue()9101112dispatch13dispatch()1415create1617

例子代码

某系统需要支持从内网的某台FTP服务器上下载一批文件的功能,该功能的实现会用到一款开元的FTP组件,该组件并非线程安全,所以需要用到串行线程封闭模式。

文件下载服务类源码

public class MessageFileDownloader {
//模式角色SerialThreadConfiment WorkerThread
private final WorkerThread workerThread;

public MessageFileDownlader(String outputDir,final String ftpServer,
final String userName,final String password){
workerThread = new WorkerThread(outputDir,ftpServer,userName,password);
}

public void init(){
workerThread.start();
}

public void shutdown(){
workerThread.terminate();
}

public void downloadFile(String file){
workerThread.download(file);
}

//模式角色:SerialThreadConfiment.WorkerThread
private static class WorkerThread extends AbstractTerminatableThread{
//模式角色:SerialThreadCOnfiment.Queue
private final BlockingQueue<String>workQueue;
private final Future<FPClient>ftpClientPromise;
private final String outputDir;

public WorkerThread(String outputDir,final String ftpServer,
final String userName,final String password){
this.workQueue = new ArrayBlockingQueue<Stirng>(100);
this.outputDir = outputDir + '/';

this.ftpClientPromise = new FutureTask<FTPClient>(){
new Callable<FTPClient>(){
@Override
public FTPClient call() throws Exception{
FTPClient ftpClient = initFTPClient(ftpServer,userName,
password);
return ftpClient;
}
}
};
new Thread((FutureTask<FTPClient>) ftpClientPromise).start();
}

public void download(String file){
try{
workQueue.put(file);
}catch(InterruptedException e){
;
}
}

private FTPClient initFTPClient(String ftpServer,String userName,
String password) throws Exception{
FtpClient ftpClient = new FTPClient();

FTPClientConfig config = new FTPClientConfig();
ftpClient.configure(config);

int reply;
ftpClient.connect(ftpServer);

System.out.print(ftpClient.getReplyString());

reply = ftpClient.getReplyCode();

if(!FTPReply.isPositiveCompletion(reply)){
ftpClient.disconnect();
throw new RuntimeException("FTP server refused connection.");
}
boolean isOK = ftpClient.login(userName, password);
if(isOK){
System.out.println(ftpClient.getReplyString();
}else{
throw new RuntimeException("Failed to login."
+ ftpClient.getRepleyString());
}

reply = ftpClient.cwd("`/messages");
if(!FTPReply.isPositiveCompletion(reply)){
ftpClient.disconnect();
throw new RuntimeException("Failed to change working" +
"directory.reply:" + reply);
}else{
System.out.println(ftpClient.getReplyString());
}
ftpClient.setFileype(FTP.ASCII_FILE_TYPE);
return ftpClient;
}

@Override
protected void doRun() throws Exception{
String file =workQueue.take();
OutputStream os = null;
try{
os = new BufferedOutputStream(new FileOutputStream(outputDir +
file));
}finally{
if(null != os){
try{
os.close();
}catch(Exception e){
e.printStackTrace();
}
}
terminationToken.reservations.decrementAndGet();
}

protected void doCleanup(Exception cause){
try{
ftpClientPromise.get().diaconnect();
}catch(IOException e){
e.printStackTrace();
}catch(InterruptedException e){
e.printStackTrace();
}catch(ExecutionException e){
e.printStackTrace();
}
}
}
}
}


客户端源码

public class SampleClient {
private static final MessageFileDownloader DOWNLOADER;

static{
DOWNLOADER = new MessageFileDownloader("/home/viscent/tmp/incoming",
"192.168.1.105","datacenter","abc123");
DOWNLOADER.init();
}
public static void main(String[] args){
DOWNLOADER.downloadFile("abc.xml");
//执行其他操作
}
}


模式的考量

串行线程封闭模式典型应用的场景有以下两种

1.需要使用非线程安全对象,但又不希望引入锁

2.任务的执行涉及I/O操作,但我们不希望过多的I/O线程增加上下文切换

它可以帮助达成

1.异步编程。

2.不借助锁而实现线程安全。

但是如果客户端关系任务的处理结果,可以借用Promise模式,此时,可以让Serializer的service方法返回一个Promise实例,客户端代码可以通过该实例获取相应任务的处理结果。但是这样做要注意一点:多个客户端共享同一个Serializer实例意味着多个线程会等待同一个线程的处理结果,如果WorkerThread处理过慢,或者Serializer的service方法调用与获得任务处理结果之间的时间间隔太短,使得WorkerThread没有足够的时间执行相应任务,就可能导致客户端线程等待时间过长。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息