多线程设计模式——Thread Confinement(串行线程封闭)模式
2016-07-12 07:19
393 查看
这些都是根据我最近看的《Java实战指南多线程编程(设计模式篇)》所得整理。
Serializer:负责对外暴露服务接口以及工作者线程的生命管理,并将客户端对其的并发服务调用转换为相应的任务以实现服务串行化。Service表示SerialThreadConfinerment模式对外暴露的服务方法,该方法将客户端对其的并发调用串行化,init初始化SerialThreadConfinerment模式对外暴露的服务,该方法可以启动工作者线程,shutdown停止SerialThreadConfinerment模式对外暴露的服务,该方法可以停止工作者线程。
WokerThrea:工作者线程,负责接收Serializer提交的并发任务以及这些任务的执行。Submit用于Serializer提交并发任务,并将这些任务串行化,dispath用于执行串行化任务。
Queue:Serializer和WorkerThread间的缓冲区,用于实现并发任务的串行化enqueue用于并发任务入队列,dequeue用于任务出队列。
NonThreadSafeObject工作者线程执行任务时所需访问的非线程安全对象。
Created with Raphaël 2.1.0ClientClientSerializerSerializerWorkerThreadWorkerThreadSeralizerSeralizerQueueQueueNonThreadSafeObjectNonThreadSafeObject1init()2create3start()456service()7submit()8enqueue()9101112dispatch13dispatch()1415create1617
文件下载服务类源码
客户端源码
1.需要使用非线程安全对象,但又不希望引入锁
2.任务的执行涉及I/O操作,但我们不希望过多的I/O线程增加上下文切换
它可以帮助达成
1.异步编程。
2.不借助锁而实现线程安全。
但是如果客户端关系任务的处理结果,可以借用Promise模式,此时,可以让Serializer的service方法返回一个Promise实例,客户端代码可以通过该实例获取相应任务的处理结果。但是这样做要注意一点:多个客户端共享同一个Serializer实例意味着多个线程会等待同一个线程的处理结果,如果WorkerThread处理过慢,或者Serializer的service方法调用与获得任务处理结果之间的时间间隔太短,使得WorkerThread没有足够的时间执行相应任务,就可能导致客户端线程等待时间过长。
模式名称
Thread Confinement(串行线程封闭)模式模式解决的问题
如果并发任务的执行涉及某个非线程安全对象,而很多时候我们又不希望因此而引入锁。解决思路
Thread Confinement(串行线程封闭)模式的核心思想是通过将多个并发的任务存入队列实现任务的串行化,并为这些串行化为任务创建唯一的工作者线程进行处理。Serializer:负责对外暴露服务接口以及工作者线程的生命管理,并将客户端对其的并发服务调用转换为相应的任务以实现服务串行化。Service表示SerialThreadConfinerment模式对外暴露的服务方法,该方法将客户端对其的并发调用串行化,init初始化SerialThreadConfinerment模式对外暴露的服务,该方法可以启动工作者线程,shutdown停止SerialThreadConfinerment模式对外暴露的服务,该方法可以停止工作者线程。
WokerThrea:工作者线程,负责接收Serializer提交的并发任务以及这些任务的执行。Submit用于Serializer提交并发任务,并将这些任务串行化,dispath用于执行串行化任务。
Queue:Serializer和WorkerThread间的缓冲区,用于实现并发任务的串行化enqueue用于并发任务入队列,dequeue用于任务出队列。
NonThreadSafeObject工作者线程执行任务时所需访问的非线程安全对象。
Created with Raphaël 2.1.0ClientClientSerializerSerializerWorkerThreadWorkerThreadSeralizerSeralizerQueueQueueNonThreadSafeObjectNonThreadSafeObject1init()2create3start()456service()7submit()8enqueue()9101112dispatch13dispatch()1415create1617
例子代码
某系统需要支持从内网的某台FTP服务器上下载一批文件的功能,该功能的实现会用到一款开元的FTP组件,该组件并非线程安全,所以需要用到串行线程封闭模式。文件下载服务类源码
public class MessageFileDownloader { //模式角色SerialThreadConfiment WorkerThread private final WorkerThread workerThread; public MessageFileDownlader(String outputDir,final String ftpServer, final String userName,final String password){ workerThread = new WorkerThread(outputDir,ftpServer,userName,password); } public void init(){ workerThread.start(); } public void shutdown(){ workerThread.terminate(); } public void downloadFile(String file){ workerThread.download(file); } //模式角色:SerialThreadConfiment.WorkerThread private static class WorkerThread extends AbstractTerminatableThread{ //模式角色:SerialThreadCOnfiment.Queue private final BlockingQueue<String>workQueue; private final Future<FPClient>ftpClientPromise; private final String outputDir; public WorkerThread(String outputDir,final String ftpServer, final String userName,final String password){ this.workQueue = new ArrayBlockingQueue<Stirng>(100); this.outputDir = outputDir + '/'; this.ftpClientPromise = new FutureTask<FTPClient>(){ new Callable<FTPClient>(){ @Override public FTPClient call() throws Exception{ FTPClient ftpClient = initFTPClient(ftpServer,userName, password); return ftpClient; } } }; new Thread((FutureTask<FTPClient>) ftpClientPromise).start(); } public void download(String file){ try{ workQueue.put(file); }catch(InterruptedException e){ ; } } private FTPClient initFTPClient(String ftpServer,String userName, String password) throws Exception{ FtpClient ftpClient = new FTPClient(); FTPClientConfig config = new FTPClientConfig(); ftpClient.configure(config); int reply; ftpClient.connect(ftpServer); System.out.print(ftpClient.getReplyString()); reply = ftpClient.getReplyCode(); if(!FTPReply.isPositiveCompletion(reply)){ ftpClient.disconnect(); throw new RuntimeException("FTP server refused connection."); } boolean isOK = ftpClient.login(userName, password); if(isOK){ System.out.println(ftpClient.getReplyString(); }else{ throw new RuntimeException("Failed to login." + ftpClient.getRepleyString()); } reply = ftpClient.cwd("`/messages"); if(!FTPReply.isPositiveCompletion(reply)){ ftpClient.disconnect(); throw new RuntimeException("Failed to change working" + "directory.reply:" + reply); }else{ System.out.println(ftpClient.getReplyString()); } ftpClient.setFileype(FTP.ASCII_FILE_TYPE); return ftpClient; } @Override protected void doRun() throws Exception{ String file =workQueue.take(); OutputStream os = null; try{ os = new BufferedOutputStream(new FileOutputStream(outputDir + file)); }finally{ if(null != os){ try{ os.close(); }catch(Exception e){ e.printStackTrace(); } } terminationToken.reservations.decrementAndGet(); } protected void doCleanup(Exception cause){ try{ ftpClientPromise.get().diaconnect(); }catch(IOException e){ e.printStackTrace(); }catch(InterruptedException e){ e.printStackTrace(); }catch(ExecutionException e){ e.printStackTrace(); } } } } }
客户端源码
public class SampleClient { private static final MessageFileDownloader DOWNLOADER; static{ DOWNLOADER = new MessageFileDownloader("/home/viscent/tmp/incoming", "192.168.1.105","datacenter","abc123"); DOWNLOADER.init(); } public static void main(String[] args){ DOWNLOADER.downloadFile("abc.xml"); //执行其他操作 } }
模式的考量
串行线程封闭模式典型应用的场景有以下两种1.需要使用非线程安全对象,但又不希望引入锁
2.任务的执行涉及I/O操作,但我们不希望过多的I/O线程增加上下文切换
它可以帮助达成
1.异步编程。
2.不借助锁而实现线程安全。
但是如果客户端关系任务的处理结果,可以借用Promise模式,此时,可以让Serializer的service方法返回一个Promise实例,客户端代码可以通过该实例获取相应任务的处理结果。但是这样做要注意一点:多个客户端共享同一个Serializer实例意味着多个线程会等待同一个线程的处理结果,如果WorkerThread处理过慢,或者Serializer的service方法调用与获得任务处理结果之间的时间间隔太短,使得WorkerThread没有足够的时间执行相应任务,就可能导致客户端线程等待时间过长。
相关文章推荐
- java对世界各个时区(TimeZone)的通用转换处理方法(转载)
- java-注解annotation
- java-模拟tomcat服务器
- java-用HttpURLConnection发送Http请求.
- java-WEB中的监听器Lisener
- Android IPC进程间通讯机制
- Android Native 绘图方法
- Android java 与 javascript互访(相互调用)的方法例子
- Python3写爬虫(四)多线程实现数据爬取
- 介绍一款信息管理系统的开源框架---jeecg
- 聚类算法之kmeans算法java版本
- java实现 PageRank算法
- PropertyChangeListener简单理解
- c++11 + SDL2 + ffmpeg +OpenAL + java = Android播放器
- 插入排序
- 冒泡排序
- 堆排序