一个通用并发对象池的实现
转自:http://ifeve.com/generic-concurrent-object-pool/
原文链接,译文链接,原文作者:SarmaSwaranga,本文最早发表于deepinmind,校对:郑旭东
这篇文章里我们主要讨论下如何在Java里实现一个对象池。最近几年,Java虚拟机的性能在各方面都得到了极大的提升,因此对大多数对象而言,已经没有必要通过对象池来提高性能了。根本的原因是,创建一个新的对象的开销已经不像过去那样昂贵了。
然而,还是有些对象,它们的创建开销是非常大的,比如线程,数据库连接等这些非轻量级的对象。在任何一个应用程序里面,我们肯定会用到不止一个这样的对象。如果有一种很方便的创建管理这些对象的池,使得这些对象能够动态的重用,而客户端代码也不用关心它们的生命周期,还是会很给力的。
在真正开始写代码前,我们先来梳理下一个对象池需要完成哪些功能。
如果有可用的对象,对象池应当能返回给客户端。
客户端把对象放回池里后,可以对这些对象进行重用。
对象池能够创建新的对象来满足客户端不断增长的需求。
需要有一个正确关闭池的机制来确保关闭后不会发生内存泄露。
不用说了,上面几点就是我们要暴露给客户端的连接池的接口的基本功能。
我们的声明的接口如下:
04 | *Representsacachedpoolofobjects. |
08 | *@param<T>thetypeofobjecttopool. |
13 | *Returnsaninstancefromthepool. |
14 | *Thecallmaybeablockingoneoranon-blockingone |
15 | *andthatisdeterminedbytheinternalimplementation. |
17 | *Ifthecallisablockingcall, |
18 | *thecallreturnsimmediatelywithavalidobject |
19 | *ifavailable,elsethethreadismadetowait |
20 | *untilanobjectbecomesavailable. |
21 | *Incaseofablockingcall, |
22 | *itisadvisedthatclientsreact |
23 | *to{@linkInterruptedException}whichmightbethrown |
24 | *whenthethreadwaitsforanobjecttobecomeavailable. |
26 | *Ifthecallisanon-blockingone, |
27 | *thecallreturnsimmediatelyirrespectiveof |
28 | *whetheranobjectisavailableornot. |
29 | *Ifanyobjectisavailablethecallreturnsit |
30 | *elsethecallreturns<code>null</code>. |
32 | *Thevalidityoftheobjectsaredeterminedusingthe |
33 | *{@linkValidator}interface,suchthat |
34 | *anobject<code>o</code>isvalidif |
35 | *<code>Validator.isValid(o)==true</code>. |
37 | *@returnToneofthepooledobjects. |
42 | *Releasestheobjectandputsitbacktothepool. |
44 | *Themechanismofputtingtheobjectbacktothepoolis |
45 | *generallyasynchronous, |
46 | *howeverfutureimplementationsmightdiffer. |
48 | *@paramttheobjecttoreturntothepool |
54 | *Shutsdownthepool.Inessencethiscallwillnot |
56 | *andwillreleaseallresources. |
57 | *Releasingresourcesaredone |
58 | *viathe<code>invalidate()</code> |
59 | *methodofthe{@linkValidator}interface. |
为了能够支持任意对象,上面这个接口故意设计得很简单通用。它提供了从池里获取/返回对象的方法,还有一个关闭池的机制,以便释放对象。
现在我们来实现一下这个接口。开始动手之前,值得一提的是,一个理想的release方法应该先尝试检查下这个客户端返回的对象是否还能重复使用。如果是的话再把它扔回池里,如果不是,就舍弃掉这个对象。我们希望这个Pool接口的所有实现都能遵循这个规则。在开始具体的实现类前,我们先创建一个抽象类,以便限制后续的实现能遵循这点。我们实现的抽象类就叫做AbstractPool,它的定义如下:
04 | *Representsanabstractpool,thatdefinestheprocedure |
05 | *ofreturninganobjecttothepool. |
09 | *@param<T>thetypeofpooledobjects. |
11 | abstract class
AbstractPool<T> implements
Pool<T> |
14 | *Returnstheobjecttothepool. |
15 | *Themethodfirstvalidatestheobjectifitis |
16 | *re-usableandthenputsreturnsittothepool. |
18 | *Iftheobjectvalidationfails, |
20 | *willtrytocreateanewone |
21 | *andputitintothepool;however |
22 | *thisbehaviourissubjecttochange |
23 | *fromimplementationtoimplementation |
27 | public
final void release(Tt)
|
35 | handleInvalidReturn(t); |
39 | protected
abstract void handleInvalidReturn(Tt);
|
41 | protected
abstract void returnToPool(Tt);
|
43 | protected
abstract boolean isValid(Tt);
|
在上面这个类里,我们让对象池必须得先验证对象后才能把它放回到池里。具体的实现可以自由选择如何实现这三种方法,以便定制自己的行为。它们根据自己的逻辑来决定如何判断一个对象有效,无效的话应该怎么处理(handleInvalidReturn方法),怎么把一个有效的对象放回到池里(returnToPool方法)。
有了上面这几个类,我们就可以着手开始具体的实现了。不过还有个问题,由于上面这些类是设计成能支持通用的对象池的,因此具体的实现不知道该如何验证对象的有效性(因为对象都是泛型的)。因此我们还需要些别的东西来帮助我们完成这个。
我们需要一个通用的方法来完成对象的校验,而具体的实现不必关心对象是何种类型。因此我们引入了一个新的接口,Validator,它定义了验证对象的方法。这个接口的定义如下:
04 | *Representsthefunctionalityto |
05 | *validateanobjectofthepool |
06 | *andtosubsequentlyperformcleanupactivities. |
10 | *@param<T>thetypeofobjectstovalidateandcleanup. |
12 | public
static interface Validator<T>
|
15 | *Checkswhethertheobjectisvalid. |
17 | *@paramttheobjecttocheck. |
19 | *@return<code>true</code> |
20 | *iftheobjectisvalidelse<code>false</code>. |
22 | public
boolean isValid(Tt); |
25 | *Performsanycleanupactivities |
26 | *beforediscardingtheobject. |
27 | *Forexamplebeforediscarding |
28 | *databaseconnectionobjects, |
29 | *thepoolwillwanttoclosetheconnections. |
31 | *<code>invalidate()</code>method. |
33 | *@paramttheobjecttocleanup |
36 | public
void invalidate(Tt); |
上面这个接口定义了一个检验对象的方法,以及一个把对象置为无效的方法。当准备废弃一个对象并清理内存的时候,invalidate方法就派上用场了。值得注意的是这个接口本身没有任何意义,只有当它在对象池里使用的时候才有意义,所以我们把这个接口定义到Pool接口里面。这和Java集合库里的Map和Map.Entry是一样的。所以我们的Pool接口就成了这样:
04 | *Representsacachedpoolofobjects. |
08 | *@param<T>thetypeofobjecttopool. |
13 | *Returnsaninstancefromthepool. |
14 | *Thecallmaybeablockingoneoranon-blockingone |
15 | *andthatisdeterminedbytheinternalimplementation. |
17 | *Ifthecallisablockingcall, |
18 | *thecallreturnsimmediatelywithavalidobject |
19 | *ifavailable,elsethethreadismadetowait |
20 | *untilanobjectbecomesavailable. |
21 | *Incaseofablockingcall, |
22 | *itisadvisedthatclientsreact |
23 | *to{@linkInterruptedException}whichmightbethrown |
24 | *whenthethreadwaitsforanobjecttobecomeavailable. |
26 | *Ifthecallisanon-blockingone, |
27 | *thecallreturnsimmediatelyirrespectiveof |
28 | *whetheranobjectisavailableornot. |
29 | *Ifanyobjectisavailablethecallreturnsit |
30 | *elsethecallreturns<code>null</code>. |
32 | *Thevalidityoftheobjectsaredeterminedusingthe |
33 | *{@linkValidator}interface,suchthat |
34 | *anobject<code>o</code>isvalidif |
35 | *<code>Validator.isValid(o)==true</code>. |
37 | *@returnToneofthepooledobjects. |
42 | *Releasestheobjectandputsitbacktothepool. |
44 | *Themechanismofputtingtheobjectbacktothepoolis |
45 | *generallyasynchronous, |
46 | *howeverfutureimplementationsmightdiffer. |
48 | *@paramttheobjecttoreturntothepool |
54 | *Shutsdownthepool.Inessencethiscallwillnot |
56 | *andwillreleaseallresources. |
57 | *Releasingresourcesaredone |
58 | *viathe<code>invalidate()</code> |
59 | *methodofthe{@linkValidator}interface. |
65 | *Representsthefunctionalityto |
66 | *validateanobjectofthepool |
67 | *andtosubsequentlyperformcleanupactivities. |
71 | *@param<T>thetypeofobjectstovalidateandcleanup. |
73 | public
static interface Validator<T>
|
76 | *Checkswhethertheobjectisvalid. |
78 | *@paramttheobjecttocheck. |
80 | *@return<code>true</code> |
81 | *iftheobjectisvalidelse<code>false</code>. |
83 | public
boolean isValid(Tt); |
86 | *Performsanycleanupactivities |
87 | *beforediscardingtheobject. |
88 | *Forexamplebeforediscarding |
89 | *databaseconnectionobjects, |
90 | *thepoolwillwanttoclosetheconnections. |
92 | *<code>invalidate()</code>method. |
94 | *@paramttheobjecttocleanup |
97 | public
void invalidate(Tt); |
准备工作已经差不多了,在最后开始前我们还需要一个终极武器,这才是这个对象池的杀手锏。就是“能够创建新的对象”。我们的对象池是泛型的,因此它们得知道如何去生成新的对象来填充这个池子。这个功能不能依赖于对象池本身,必须要有一个通用的方式来创建新的对象。通过一个ObjectFactory的接口就能完成这个,它只有一个“如何创建新的对象”的方法。我们的ObjectFactory接口如下:
04 | *Representsthemechanismtocreate |
05 | *newobjectstobeusedinanobjectpool. |
09 | *@param<T>thetypeofobjecttocreate. |
11 | public interface
ObjectFactory<T> |
14 | *ReturnsanewinstanceofanobjectoftypeT. |
16 | *@returnTannewinstanceoftheobjectoftypeT |
18 | public
abstract TcreateNew(); |
我们的工具类都已经搞定了,现在可以开始真正实现我们的Pool接口了。因为我们希望这个池能在并发程序里面使用,所以我们会创建一个阻塞的对象池,当没有对象可用的时候,让客户端先阻塞住。我们的阻塞机制是让客户端一直阻塞直到有对象可用为止。这样的话导致我们还需要再增加一个只阻塞一定时间的方法,如果在超时时间到来前有对象可用则返回,如果超时了就返回null而不是一直等待下去。这样的实现有点类似Java并发库里的LinkedBlockingQueue,因此真正实现前我们再暴露一个接口,BlockingPool,类似于Java并发库里的BlockingQueue接口。
这里是BlockingQueue的声明:
03 | import java.util.concurrent.TimeUnit; |
06 | *Representsapoolofobjectsthatmakesthe |
07 | *requestingthreadswaitifnoobjectisavailable. |
11 | *@param<T>thetypeofobjectstopool. |
13 | public interface
BlockingPool<T> extends
Pool<T> |
16 | *ReturnsaninstanceoftypeTfromthepool. |
18 | *Thecallisablockingcall, |
19 | *andclientthreadsaremadetowait |
20 | *indefinitelyuntilanobjectisavailable. |
21 | *Thecallimplementsafairnessalgorithm |
22 | *thatensuresthataFCFSserviceisimplemented. |
24 | *ClientsareadvisedtoreacttoInterruptedException. |
25 | *Ifthethreadisinterruptedwhilewaiting |
26 | *foranobjecttobecomeavailable, |
27 | *thecurrentimplementations |
28 | *setstheinterruptedstateofthethread |
29 | *to<code>true</code>andreturnsnull. |
30 | *Howeverthisissubjecttochange |
31 | *fromimplementationtoimplementation. |
33 | *@returnTaninstanceoftheObject |
39 | *ReturnsaninstanceoftypeTfromthepool, |
41 | *specifiedwaittimeifnecessary |
42 | *foranobjecttobecomeavailable.. |
44 | *Thecallisablockingcall, |
45 | *andclientthreadsaremadetowait |
46 | *fortimeuntilanobjectisavailable |
47 | *oruntilthetimeoutoccurs. |
48 | *Thecallimplementsafairnessalgorithm |
49 | *thatensuresthataFCFSserviceisimplemented. |
51 | *ClientsareadvisedtoreacttoInterruptedException. |
52 | *Ifthethreadisinterruptedwhilewaiting |
53 | *foranobjecttobecomeavailable, |
54 | *thecurrentimplementations |
55 | *settheinterruptedstateofthethread |
56 | *to<code>true</code>andreturnsnull. |
57 | *Howeverthisissubjecttochange |
58 | *fromimplementationtoimplementation. |
61 | *@paramtimeamountoftimetowaitbeforegivingup, |
62 | *inunitsof<tt>unit</tt> |
63 | *@paramunita<tt>TimeUnit</tt>determining |
65 | *<tt>timeout</tt>parameter |
67 | *@returnTaninstanceoftheObject |
70 | *@throwsInterruptedException |
71 | *ifinterruptedwhilewaiting |
74 | Tget( long
time,TimeUnitunit) throws
InterruptedException; |
BoundedBlockingPool的实现如下:
003 | import java.util.concurrent.BlockingQueue; |
004 | import java.util.concurrent.Callable; |
005 | import java.util.concurrent.ExecutorService; |
006 | import java.util.concurrent.Executors; |
007 | import java.util.concurrent.LinkedBlockingQueue; |
008 | import java.util.concurrent.TimeUnit; |
009 | public final
class BoundedBlockingPool |
011 | implements
<BlockingPool> |
014 | private
BlockingQueueobjects; |
015 | private
Validatorvalidator; |
016 | private
ObjectFactoryobjectFactory; |
017 | private
ExecutorServiceexecutor= |
018 | Executors.newCachedThreadPool(); |
019 | private
volatile boolean shutdownCalled; |
021 | public
BoundedBlockingPool( |
024 | ObjectFactoryobjectFactory) |
027 | this .objectFactory=objectFactory; |
029 | this .validator=validator; |
030 | objects=
new LinkedBlockingQueue(size); |
035 | public
Tget( long timeOut,TimeUnitunit)
|
042 | t=objects.poll(timeOut,unit); |
045 | catch (InterruptedExceptionie) |
047 | Thread.currentThread().interrupt(); |
051 | throw
new IllegalStateException( |
052 | 'Objectpoolisalreadyshutdown' ); |
065 | catch (InterruptedExceptionie) |
067 | Thread.currentThread().interrupt(); |
072 | throw
new IllegalStateException( |
073 | 'Objectpoolisalreadyshutdown' ); |
079 | executor.shutdownNow(); |
083 | private
void clearResources() |
087 | validator.invalidate(t); |
092 | protected
void returnToPool(Tt) |
094 | if (validator.isValid(t)) |
096 | executor.submit( new
ObjectReturner(objects,t)); |
101 | protected
void handleInvalidReturn(Tt) |
106 | protected
boolean isValid(Tt) |
108 | return
validator.isValid(t); |
111 | private
void initializeObjects() |
113 | for ( int
i= 0 ;i<size;i++) |
115 | objects.add(objectFactory.createNew()); |
119 | private
class ObjectReturner |
122 | private
BlockingQueuequeue; |
124 | public
ObjectReturner(BlockingQueuequeue,Ee) |
139 | catch (InterruptedExceptionie) |
141 | Thread.currentThread().interrupt(); |
上面是一个非常基本的对象池,它内部是基于一个LinkedBlockingQueue来实现的。这里唯一比较有意思的方法就是returnToPool。因为内部的存储是一个LinkedBlockingQueue实现的,如果我们直接把返回的对象扔进去的话,如果队列已满可能会阻塞住客户端。不过我们不希望客户端因为把对象放回池里这么个普通的方法就阻塞住了。所以我们把最终将对象插入到队列里的任务作为一个异步的的任务提交给一个Executor来执行,以便让客户端线程能立即返回。
现在我们将在自己的代码中使用上面这个对象池,用它来缓存数据库连接。我们需要一个校验器来验证数据库连接是否有效。
下面是这个JDBCConnectionValidator:
03 | import java.sql.Connection; |
04 | import java.sql.SQLException; |
05 | import com.test.pool.Pool.Validator; |
06 | public final
class JDBCConnectionValidator
implements Validator<Connection> |
08 | public
boolean isValid(Connectioncon) |
26 | public
void invalidate(Connectioncon) |
还有一个JDBCObjectFactory,它将用来生成新的数据库连接对象:
03 | import java.sql.Connection; |
04 | import java.sql.DriverManager; |
05 | import java.sql.SQLException; |
06 | import com.test.pool.ObjectFactory; |
07 | public class
JDBCConnectionFactory implements
ObjectFactory<Connection> |
09 | private
StringconnectionURL; |
12 | public
JDBCConnectionFactory( |
22 | catch (ClassNotFoundExceptionce) |
24 | throw
new IllegalArgumentException( 'Unabletofinddriverinclasspath' ,ce); |
26 | this .connectionURL=connectionURL; |
27 | this .userName=userName; |
28 | this .password=password; |
31 | public
ConnectioncreateNew() |
35 | return
DriverManager.getConnection( |
42 | throw
new IllegalArgumentException( 'Unabletocreatenewconnection' ,se); |
现在我们用上述的Validator和ObjectFactory来创建一个JDBC的连接池:
03 | import java.sql.Connection; |
04 | import com.test.pool.Pool; |
05 | import com.test.pool.PoolFactory; |
09 | public
static void main(String[]args)
|
12 | new
BoundedBlockingPool<Connection>( |
14 | new
JDBCConnectionValidator(), |
15 | new
JDBCConnectionFactory( '' , '' , '' , '' ) |
为了犒劳下能读完整篇文章的读者,我这再提供另一个非阻塞的对象池的实现,这个实现和前面的唯一不同就是即使对象不可用,它也不会让客户端阻塞,而是直接返回null。具体的实现在这:
03 | import java.util.LinkedList; |
05 | import java.util.concurrent.Semaphore; |
07 | public class
BoundedPool<T> extends
AbstractPool<T> |
10 | private
Queue<T>objects; |
11 | private
Validator<T>validator; |
12 | private
ObjectFactory<T>objectFactory; |
13 | private
Semaphorepermits; |
14 | private
volatile boolean shutdownCalled; |
19 | ObjectFactory<T>objectFactory) |
22 | this .objectFactory=objectFactory; |
24 | this .validator=validator; |
25 | objects=
new LinkedList<T>(); |
37 | if (permits.tryAcquire()) |
45 | throw
new IllegalStateException( 'Objectpoolalreadyshutdown' ); |
57 | private
void clearResources() |
61 | validator.invalidate(t); |
66 | protected
void returnToPool(Tt) |
68 | boolean
added=objects.add(t); |
76 | protected
void handleInvalidReturn(Tt) |
81 | protected
boolean isValid(Tt) |
83 | return
validator.isValid(t); |
86 | private
void initializeObjects() |
90 | objects.add(objectFactory.createNew()); |
考虑到我们现在已经有两种实现,非常威武了,得让用户通过工厂用具体的名称来创建不同的对象池了。工厂来了:
03 | import com.test.pool.Pool.Validator; |
07 | *Factoryandutilitymethodsfor |
09 | *{@linkPool}and{@linkBlockingPool}classes |
12 | *Thisclasssupportsthefollowingkindsofmethods: |
17 | <li>Methodthatcreatesandreturnsadefaultnon-blocking |
18 | *implementationofthe{@linkPool}interface. |
22 | <li>Methodthatcreatesandreturnsa |
23 | *defaultimplementationof |
24 | *the{@linkBlockingPool}interface. |
31 | public final
class PoolFactory |
38 | *Createsaandreturnsanewobjectpool, |
39 | *thatisanimplementationofthe{@linkBlockingPool}, |
41 | *the<tt>size</tt>parameter. |
43 | *@paramsizethenumberofobjectsinthepool. |
44 | *@paramfactorythefactorytocreatenewobjects. |
45 | *@paramvalidatorthevalidatorto |
46 | *validatethere-usabilityofreturnedobjects. |
48 | *@returnablockingobjectpool |
49 | *boundedby<tt>size</tt> |
52 | newBoundedBlockingPool( |
54 | ObjectFactory<T>factory, |
57 | return
new BoundedBlockingPool<T>( |
63 | *Createsaandreturnsanewobjectpool, |
64 | *thatisanimplementationofthe{@linkPool} |
66 | *bythe<tt>size</tt>parameter. |
68 | *@paramsizethenumberofobjectsinthepool. |
69 | *@paramfactorythefactorytocreatenewobjects. |
70 | *@paramvalidatorthevalidatortovalidate |
71 | *there-usabilityofreturnedobjects. |
73 | *@returnanobjectpoolboundedby<tt>size</tt> |
75 | public static
<T>Pool<T>newBoundedNonBlockingPool( |
77 | ObjectFactory<T>factory, |
80 | return
new BoundedPool<T>(size,validator,factory); |
现在我们的客户端就能用一种可读性更强的方式来创建对象池了:
03 | import java.sql.Connection; |
04 | import com.test.pool.Pool; |
05 | import com.test.pool.PoolFactory; |
09 | public
static void main(String[]args)
|
12 | PoolFactory.newBoundedBlockingPool( |
14 | new
JDBCConnectionFactory( '' , '' , '' , '' ), |
15 | new
JDBCConnectionValidator()); |
好吧,终于写完了,拖了这么久了。尽情使用和完善它吧,或者再多加几种实现。
快乐编码,快乐分享!