Spring源码分析总结——Spring整合activeMQ
2018-03-15 15:43
441 查看
该文章基于《Spring源码深度解析》撰写,感谢郝佳老师的奉献
承接上文activeMQ的使用,下面来进行源码分析。
![](//img-blog.csdn.net/2018031511205193?watermark/2/text/Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80MDEzNDM2Nw==/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70)
虽然出现了常见的InitializingBean接口,但是该接口实际上只是一个验证的功能。
下面来看看实际的方法调用:
这种风格及其类似于JdbcTemplate类的实现风格,通过提取一个公共的方法作为最底层,最通用的功能实现,然后用回调函数的不同来区分个性化的功能。
下面先分析通用代码的抽取:抽取了Connection,Session的创建以及关闭的通用处理方式。
接下来分析发送消息是如何实现的:首先execute中封装了冗余代码,个性化的代码放在了回调函数doInJms中进行实现,实现的方式大概步骤是通过Destination创建MessageProceducer,创建Message,并使用MessageProcedure实例来发送消息。
下面接下来来看接收消息的实现,套路与消息发送差不多,都是通过定义内部类,然后调用统一的execute方法然后再调用内部类中定义的doInJms进行个性化方法的实现。
(1)SimpleMessageListenerContainer,不支持事务,只能处理固定数量的JMS会话
(2)DefaultMessageListenerContainer,建立在SimpleMessageListenerContainer之上,支持事务
下面给出DefaultMessageListenerContainer的类层次结构图:
![](//img-blog.csdn.net/20180315141338718?watermark/2/text/Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80MDEzNDM2Nw==/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70)
初始化的工作仍然在InitializingBean接口的afterPropertiesSet方法中实现,其逻辑为:
(1)属性的验证,connectionFactory和destination属性是否为空
(2)初始化,由于可以创建多个Session和MessageConsumer来接收消息(最好在Destination类型为Queue才使用多个MessageConsumer,否则消息的处理可能会因为并发的原因而出现错误;不应该在Destination为Topic时使用多个MessageConsumer,其原因也是因为并发),也正是因为可以创建多个多个Session和MessageConsumer来接收消息,所以Spring的处理方式是通过创建对应数量的线程,每个线程都作为独立的接收者循环接受消息,而线程的实现类是AsyncMessageListenerInvoker,我们通过分析这个类的run方法可以知道,实际上该方法的流程分为了两个:
1.允许无限制的接收消息,2.需要控制消息的数量,其实两者的最终逻辑都是通过invokeListener方法接收消息并激活监听器,只不过允许无限制接收消息的不同就在于允许暂停线程,以及恢复线程。Spring在处理这个问题上是通过使用全局控制变量lifecycleMonitor的wait方法来暂停线程,并且通过lifecycleMonitor的notifyAll方法恢复线程。
接下来讨论消息接受的处理方式:当讨论DefaultMessageListenerContainer时由于存在事务管理所以没法复用模板中的方法,其中包括了1.接收消息,2.模板方法,调用子类的实现,3.激活监听器,4.处理空消息,我们针对其中的第三点进行重点说明,该方法中除了调用监听器的onMessage方法进行消息的传递,还有一个其他的要点就是消息服务的提交其实包含了两个事务,其中一个是通用事务,用于处理数据库提交等问题,其中第二个事务是消息事务,必须使用消息提交的方式告诉服务器本地已经正常接受消息,之后才可以将此消息删除,否则会被重新接受
承接上文activeMQ的使用,下面来进行源码分析。
JmsTemplate
在配置文件中我们定义了JmsTemplate,先来看看JmsTemplate的类层次结构:虽然出现了常见的InitializingBean接口,但是该接口实际上只是一个验证的功能。
下面来看看实际的方法调用:
public void send(final Destination destination, final MessageCreator messageCreator) throws JmsException { this.execute(new SessionCallback<Object>() { public Object doInJms(Session session) throws JMSException { JmsTemplate.this.doSend(session, destination, messageCreator); return null; } }, false); }
这种风格及其类似于JdbcTemplate类的实现风格,通过提取一个公共的方法作为最底层,最通用的功能实现,然后用回调函数的不同来区分个性化的功能。
下面先分析通用代码的抽取:抽取了Connection,Session的创建以及关闭的通用处理方式。
接下来分析发送消息是如何实现的:首先execute中封装了冗余代码,个性化的代码放在了回调函数doInJms中进行实现,实现的方式大概步骤是通过Destination创建MessageProceducer,创建Message,并使用MessageProcedure实例来发送消息。
下面接下来来看接收消息的实现,套路与消息发送差不多,都是通过定义内部类,然后调用统一的execute方法然后再调用内部类中定义的doInJms进行个性化方法的实现。
监听器容器
Spring中提供的监听器容器一共分为以下三类:(1)SimpleMessageListenerContainer,不支持事务,只能处理固定数量的JMS会话
(2)DefaultMessageListenerContainer,建立在SimpleMessageListenerContainer之上,支持事务
下面给出DefaultMessageListenerContainer的类层次结构图:
初始化的工作仍然在InitializingBean接口的afterPropertiesSet方法中实现,其逻辑为:
(1)属性的验证,connectionFactory和destination属性是否为空
(2)初始化,由于可以创建多个Session和MessageConsumer来接收消息(最好在Destination类型为Queue才使用多个MessageConsumer,否则消息的处理可能会因为并发的原因而出现错误;不应该在Destination为Topic时使用多个MessageConsumer,其原因也是因为并发),也正是因为可以创建多个多个Session和MessageConsumer来接收消息,所以Spring的处理方式是通过创建对应数量的线程,每个线程都作为独立的接收者循环接受消息,而线程的实现类是AsyncMessageListenerInvoker,我们通过分析这个类的run方法可以知道,实际上该方法的流程分为了两个:
1.允许无限制的接收消息,2.需要控制消息的数量,其实两者的最终逻辑都是通过invokeListener方法接收消息并激活监听器,只不过允许无限制接收消息的不同就在于允许暂停线程,以及恢复线程。Spring在处理这个问题上是通过使用全局控制变量lifecycleMonitor的wait方法来暂停线程,并且通过lifecycleMonitor的notifyAll方法恢复线程。
接下来讨论消息接受的处理方式:当讨论DefaultMessageListenerContainer时由于存在事务管理所以没法复用模板中的方法,其中包括了1.接收消息,2.模板方法,调用子类的实现,3.激活监听器,4.处理空消息,我们针对其中的第三点进行重点说明,该方法中除了调用监听器的onMessage方法进行消息的传递,还有一个其他的要点就是消息服务的提交其实包含了两个事务,其中一个是通用事务,用于处理数据库提交等问题,其中第二个事务是消息事务,必须使用消息提交的方式告诉服务器本地已经正常接受消息,之后才可以将此消息删除,否则会被重新接受
相关文章推荐
- Spring源码与分析总结——RMI整合
- Spring源码分析总结——Mybatis的整合
- Spring&WEB整合原理及源码分析
- 分析spring源码第五(三)篇:Spring中Bean的解析、加载、创建 过程总结
- Spring整合JMS——基于ActiveMQ实现(附源码)
- spring boot实战(第十四篇)整合RabbitMQ源码分析前言
- Spring源码分析总结——容器的功能扩展
- Spring与Mybatis整合的MapperScannerConfigurer处理过程源码分析
- spring源码分析,重新认识spring三(总结,总结下 ioc 和 aop,同时提出疑惑)
- Spring与Mybatis整合的MapperScannerConfigurer处理过程源码分析
- Spring与Mybatis整合的MapperScannerConfigurer处理过程源码分析
- spring 源码分析及知识点总结
- Spring与Mybatis整合的MapperScannerConfigurer处理过程源码分析
- spring源码分析之spring-core总结篇
- ActiveMQ学习总结(3)——spring整合ActiveMQ
- spring 与hibernate 的 整合 源码分析
- Spring与Mybatis整合的MapperScannerConfigurer处理过程源码分析
- spring boot实战(第十四篇)整合RabbitMQ源码分析前言
- Spring Redis与ActiveMQ发布订阅模式源码分析
- Spring整合MyBatis(二)源码分析