深入了解DefaultMessageListenerContainer
2013-06-25 17:23
246 查看
http://www.myexception.cn/ai/1252456.html
深入理解DefaultMessageListenerContainer
DefaultMessageListenerContainer是一个用于异步消息监听的管理类。
DefaultMessageListenerContainer最简单的实现逻辑,一个任务执行器,执行任务(即消息监听)。
DefaultMessageListenerContainer实现的主要原理是,通过内部初始化建立的一个taskExecutor(默认是 SimpleAsyncTaskExecutor)用于执行消息监听的任务(AsyncMessageListenerInvoker)。
这里默认的任务执行器是SimpleAsyncTaskExecutor,这个执行器的缺点是不会重用连接,也就是对于每个任务都需要新开启一个线程,执行完任务后会关闭它。如果要优化的话可以考虑使用线程池。
消息监听的任务被抽象成AsyncMessageListenerInvoker类,这个类实现了Runnable接口,内部run方法其实是通过不断循环MessageConsumer.receive()方法来实现监听。
事实上一个消费者对应了一个AsyncMessageListenerInvoker任务,每个任务需要一个单独的线程去执行它。这个AsyncMessageListenerInvoker实例被放在了一个名为scheduledInvokers的set里面。
其实我们还有一个比较关心的地方是这个DefaultMessageListenerContainer缓不缓存 connection,session,consumer。因为它不像消息发送一样可以使用PooledConnectionFactory或者CachingConnectionFactory。它是根据catchLevel属性来决定是否缓存 connection,session,consumer。默认的catchLevel对应常量CATCH_AUTO=4,即由配置的外部事务管理器决定。 catchLevel级别分别是CATCH_NONE,CATCH_CONNECTION,CATCH_SESSION,CATCH_CONSUMER,CATCH_AUTO,
分别对应0,1,2,3,4。我试了下默认的CATCH_AUTO在没有定义事务管理时值为 CATCH_CONSUMER,即3。
具体查看类中的方法:
publicvoid initialize(){
// Adapt default cache level.
if(this.cacheLevel
== CACHE_AUTO){
this.cacheLevel
=(getTransactionManager()!=null?
CACHE_NONE : CACHE_CONSUMER);
}
// Prepare taskExecutor and maxMessagesPerTask.
synchronized(this.lifecycleMonitor){
if(this.taskExecutor
==null){
this.taskExecutor
= createDefaultTaskExecutor();
}
elseif(this.taskExecutor
instanceofSchedulingTaskExecutor&&
((SchedulingTaskExecutor)this.taskExecutor).prefersShortLivedTasks()&&
this.maxMessagesPerTask
==Integer.MIN_VALUE){
// TaskExecutor indicated a preference for short-lived tasks. According to
// setMaxMessagesPerTask javadoc, we'll use 10 message per task in this case
// unless the user specified a custom value.
this.maxMessagesPerTask
=10;
}
}
// Proceed with actual listener initialization.
super.initialize();
}
CATCH_NONE=0,表示不缓存JMS任何资源。
CATCH_CONNECTION=1,表示只缓存JMS的共享Connection。
CATCH_SESSION=2,表示缓存JMS的共享Connection和Session。
CATCH_CONSUMER=3,表示缓存JMS的共享Connection和Session还有MessageConsumer。
CATCH_AUTO=4,表示系统自动选择一个合适的cacheLevel(根据事务管理策略)。
DefaultMessageListenerContainer会根据catchLevel来缓存共享connection,session, 及consumer。值为3的话就会缓存connection,session,及consumer,在初始化的时候就会调用父类 AbstractJmsListeningContainer的doStart()方法,判断cacheLevel是否大于等于1,如果大于就创建一个 connection将放入成员变量sharedConnection中。
每个任务被执行的时候(即责任是监听消息),会先去获取connection,session及consumer(通过调用 initResourcesIfNecessary方法)就像我们自己最初实现一个简单的客户端消费者一样。只不过这里会根据catchLevel来决定 是否缓存session及consumer。被缓存了的session及consumer放在对应的成员变量里面。
接着任务会想要执行consumer.recieve方法,这之前肯定要获取connection,session及consumer,如果已有 connection,session及consumer则获取过来,如果没有则通过配置的信息新建。执行完consumer.recieve后,会判断 consumer.recieve返回的消息是否为空。
不为空则调用message对应的messageListner(之前我们在DefaultMessageListenerContainer中通过方法 setMessageListner设置的)的onMessage执行相应的逻辑,并设置这个任务的Idle为false,表明这个任务不是空闲的,然后 会调用方法判断是否应该新建任务实例,这个受限于
任务执行完后,会在finally处释放connection,session及consumer。这个是根据上述讲的catchLevel来设置的。
继承体系如下:
AbstractJmsListeningContainer提供了一个最上层最基础的jms消息监听管理类所应该有的方法。提供了start(启动这个 管理类),stop,initialize(初始化这个管理类),establishSharedConnection等。
http://tech.techweb.com.cn/thread-535513-1-1.html
使用DefaultMessageListenerContainer作为消息接收器,典型的配置如下:
) z/ a) z* f& L9 e5 u: t2 _" {1 ^' s, T- z
<bean id="queueListenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="mqConnectionFactory" />
' K7 [( R8 |2 I& I( u; j* l) j
<property name="destination" ref="queueIn" /><!-- 接收队列 -->
<property name="concurrentConsumers" value="3" /><!-- 控制同时启几个concurrent listener threads -->
' _2 @! f) [3 ] I% l4 [
<property name="messageListener" ref="messageReceiver" />
4 W1 b. O- i% O4 y4 U, c% x
<property name="transactionManager" ref="jmsTransactionManager" />
5 K8 R3 x0 |; c4 }; I% z
<property name="sessionTransacted" value="true" />
1 o x- I4 A+ r1 L/ s
</bean>
<bean id="jmsTransactionManager"
( N/ r+ H: d( i$ A" Y' t
class="org.springframework.jms.connection.JmsTransactionManager">
<property name="connectionFactory" ref="mqConnectionFactory" />
& H7 r1 A0 j) C- A
</bean>
深入理解DefaultMessageListenerContainer
DefaultMessageListenerContainer是一个用于异步消息监听的管理类。
DefaultMessageListenerContainer最简单的实现逻辑,一个任务执行器,执行任务(即消息监听)。
DefaultMessageListenerContainer实现的主要原理是,通过内部初始化建立的一个taskExecutor(默认是 SimpleAsyncTaskExecutor)用于执行消息监听的任务(AsyncMessageListenerInvoker)。
这里默认的任务执行器是SimpleAsyncTaskExecutor,这个执行器的缺点是不会重用连接,也就是对于每个任务都需要新开启一个线程,执行完任务后会关闭它。如果要优化的话可以考虑使用线程池。
消息监听的任务被抽象成AsyncMessageListenerInvoker类,这个类实现了Runnable接口,内部run方法其实是通过不断循环MessageConsumer.receive()方法来实现监听。
事实上一个消费者对应了一个AsyncMessageListenerInvoker任务,每个任务需要一个单独的线程去执行它。这个AsyncMessageListenerInvoker实例被放在了一个名为scheduledInvokers的set里面。
其实我们还有一个比较关心的地方是这个DefaultMessageListenerContainer缓不缓存 connection,session,consumer。因为它不像消息发送一样可以使用PooledConnectionFactory或者CachingConnectionFactory。它是根据catchLevel属性来决定是否缓存 connection,session,consumer。默认的catchLevel对应常量CATCH_AUTO=4,即由配置的外部事务管理器决定。 catchLevel级别分别是CATCH_NONE,CATCH_CONNECTION,CATCH_SESSION,CATCH_CONSUMER,CATCH_AUTO,
分别对应0,1,2,3,4。我试了下默认的CATCH_AUTO在没有定义事务管理时值为 CATCH_CONSUMER,即3。
具体查看类中的方法:
publicvoid initialize(){
// Adapt default cache level.
if(this.cacheLevel
== CACHE_AUTO){
this.cacheLevel
=(getTransactionManager()!=null?
CACHE_NONE : CACHE_CONSUMER);
}
// Prepare taskExecutor and maxMessagesPerTask.
synchronized(this.lifecycleMonitor){
if(this.taskExecutor
==null){
this.taskExecutor
= createDefaultTaskExecutor();
}
elseif(this.taskExecutor
instanceofSchedulingTaskExecutor&&
((SchedulingTaskExecutor)this.taskExecutor).prefersShortLivedTasks()&&
this.maxMessagesPerTask
==Integer.MIN_VALUE){
// TaskExecutor indicated a preference for short-lived tasks. According to
// setMaxMessagesPerTask javadoc, we'll use 10 message per task in this case
// unless the user specified a custom value.
this.maxMessagesPerTask
=10;
}
}
// Proceed with actual listener initialization.
super.initialize();
}
CATCH_NONE=0,表示不缓存JMS任何资源。
CATCH_CONNECTION=1,表示只缓存JMS的共享Connection。
CATCH_SESSION=2,表示缓存JMS的共享Connection和Session。
CATCH_CONSUMER=3,表示缓存JMS的共享Connection和Session还有MessageConsumer。
CATCH_AUTO=4,表示系统自动选择一个合适的cacheLevel(根据事务管理策略)。
DefaultMessageListenerContainer会根据catchLevel来缓存共享connection,session, 及consumer。值为3的话就会缓存connection,session,及consumer,在初始化的时候就会调用父类 AbstractJmsListeningContainer的doStart()方法,判断cacheLevel是否大于等于1,如果大于就创建一个 connection将放入成员变量sharedConnection中。
每个任务被执行的时候(即责任是监听消息),会先去获取connection,session及consumer(通过调用 initResourcesIfNecessary方法)就像我们自己最初实现一个简单的客户端消费者一样。只不过这里会根据catchLevel来决定 是否缓存session及consumer。被缓存了的session及consumer放在对应的成员变量里面。
接着任务会想要执行consumer.recieve方法,这之前肯定要获取connection,session及consumer,如果已有 connection,session及consumer则获取过来,如果没有则通过配置的信息新建。执行完consumer.recieve后,会判断 consumer.recieve返回的消息是否为空。
不为空则调用message对应的messageListner(之前我们在DefaultMessageListenerContainer中通过方法 setMessageListner设置的)的onMessage执行相应的逻辑,并设置这个任务的Idle为false,表明这个任务不是空闲的,然后 会调用方法判断是否应该新建任务实例,这个受限于
MaxConcurrentConsumers及
IdleTaskExecutionLimit。为空则不需要特别处理,只需调用noMessageReceived方法将idle标记设为true。
任务执行完后,会在finally处释放connection,session及consumer。这个是根据上述讲的catchLevel来设置的。
继承体系如下:
AbstractJmsListeningContainer提供了一个最上层最基础的jms消息监听管理类所应该有的方法。提供了start(启动这个 管理类),stop,initialize(初始化这个管理类),establishSharedConnection等。
http://tech.techweb.com.cn/thread-535513-1-1.html
使用DefaultMessageListenerContainer作为消息接收器,典型的配置如下:
) z/ a) z* f& L9 e5 u: t2 _" {1 ^' s, T- z
<bean id="queueListenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="mqConnectionFactory" />
' K7 [( R8 |2 I& I( u; j* l) j
<property name="destination" ref="queueIn" /><!-- 接收队列 -->
<property name="concurrentConsumers" value="3" /><!-- 控制同时启几个concurrent listener threads -->
' _2 @! f) [3 ] I% l4 [
<property name="messageListener" ref="messageReceiver" />
4 W1 b. O- i% O4 y4 U, c% x
<property name="transactionManager" ref="jmsTransactionManager" />
5 K8 R3 x0 |; c4 }; I% z
<property name="sessionTransacted" value="true" />
1 o x- I4 A+ r1 L/ s
</bean>
<bean id="jmsTransactionManager"
( N/ r+ H: d( i$ A" Y' t
class="org.springframework.jms.connection.JmsTransactionManager">
<property name="connectionFactory" ref="mqConnectionFactory" />
& H7 r1 A0 j) C- A
</bean>
相关文章推荐
- 深入理解DefaultMessageListenerContainer
- 深入理解DefaultMessageListenerContainer
- DefaultMessageListenerContainer
- DefaultMessageListenerContainer事务相关问题
- 【spring-jms】DefaultMessageListenerContainer
- DefaultMessageListenerContainer
- spring jms DefaultMessageListenerContainer分析
- DefaultMessageListenerContainer
- Spring整合Activemq中的DefaultMessageListenerContainer
- DefaultMessageListenerContainer
- JMS与Spring之二(用message listener container异步收发消息)
- Android 异步消息处理机制 让你在深入了解 Looper、Handler、Message之间的关系
- 深入了解Looper、Handler、Message之间关系
- JMS与Spring之二(用message listener container异步收发消息)
- 深入掌握JMS(三):MessageListener
- JMS与Spring之二(用message listener container异步收发消息)
- JMS与Spring之二(用message listener container异步收发消息)
- 深入了解preventDefault与stopPropagation
- JMS(Jboss Messaging)的一点使用心得(五)Spring扩展应用-可自动重连的JmsMessageListenerContainer
- JMS(Jboss Messaging)的一点使用心得(十一)Spring扩展应用-可自动重连的JmsMessageListenerContainer的另一种实现