您的位置:首页 > 编程语言 > Java开发

[置顶] Spring整合JMS异步消息

2017-11-03 12:00 316 查看
Spring整合JMS异步消息
  

    在应用程序之间通信的消息,可分为同步消息和异步消息两种。前者就是当请求的程序端发出请求后,一直处于等待状态(阻塞),直到接收请求方反馈正确的结果后,请求方才能继续往下执行。而异步消息,则请求的程序端发出请求后,则可以继续向下执行,不需要阻塞当前流程,很多时候大大提高了用户的即时体验。当然,选择同步还是异步形式的通信,全凭应用的场景而做出合理的选择,比如:针对用户的即时操作要求较高的功能,则建议采用异步通信;而例如工作流或对即时性要求不高的功能实现,则可采用同步方式通信。

 

 

l  异步消息简介

l  消息模型简介

l  JMS异步消息

 

 

一、异步消息简介

让我们先看下同步消息的流程,如下所示:



如上图,同步消息就是当客户端请求发出后,一直处于等待请求响应服务的反馈,否则会一直等待(阻塞)下去,待服务响应方返回内容后,客户端才能继续按程序流往下执行。接下来,再对比下异步消息,它的消息流程模型与同步很相似,如下所示:



如上图,异步消息不同于同步消息的地方,主要就是当客户端发出请求到响应服务后,会继续按照程序流往下执行运作,此时,客户端不会被阻塞,也就是用户感觉不到被阻塞的体验。那么,当响应服务处理逻辑完成后,怎么才能找到是谁调取的它?一般我们会在调取的时候,传递一个绑定的回调标识进程,该进程会一直驻留等待,直到接收到服务响应的反馈才会被销毁,当然,也可以通过消息中间件,如:ActiveMQ或RabbitMQ等,先将请求消息放入中间件,待服务提供方完成处理后,从消息中间件中获取对应消息返还给客户端即可。

 

 

二、消息模型简介

上面介绍了同步及异步消息的流程及区别。接下来,我们一起看下现在通用的两种消息模型,分别是点对点模型(队列)和发布/订阅模型(主题)。

 

1、点对点模型

该模型中,每一个消息都有一个发送者和一个接收者,当消息代理得到消息时,它将其放入队列中,当接收者从队列中读取下一条消息时,消息会从队列中取出,并投递给接收者,此时对应的消息会从队列中删除,所以保证了消息只有一个接收者,如下所示:


 

 

2、发布/订阅模型

在发布-订阅消息模型中,消息会发送一个主题,而订阅了这个主题的所有接收者都会接收到此消息的副本。与队列类似,多个接收者可以监听同一个主题,不同的是消息不再只投递给一个接收者,所有订阅主题的接收者都可以收到消息副本,如下所示:


 

 

 

三、JMS异步消息

JMS,即为Java MessageService的简写,它是一个Java标准,定义了使用消息代理的通用API,类似于JDBC为数据库提供通用的接口一样。不过在Spring中提供了基于模版抽象实现JMS功能的支持,该模版就是JmsTemplate,使用它可以很方便地在消息生产方发送消息队列和主题消息,而接收方也能很方便地接收到消息。另外,Spring还提供了消息驱动POJO的概念,它是一个简单的java对象,其能够以异步的方式响应消息队列或主题上的消息。

 

1、准备工作

 

Maven配置:

     
 <dependency> 

       <groupId>org.springframework</groupId> 
       <artifactId>spring-jms</artifactId> 
       <version>4.3.2.RELEASE</version> 
       <scope>compile</scope> 

 </dependency> 

 <dependency> 

       <groupId>org.apache.activemq</groupId> 

       <artifactId>activemq-all</artifactId> 

       <version>5.9.0</version> 

       <scope>compile</scope> 

 </dependency>

 

 

A、在Spring中搭建一消息代理角色。

这里我们使用ActiveMQ作为消息代理角色,它是一个队列容器,可以很好地接收、存储、传递及管理消息,关于它的具体介绍,会在后续专题中介绍,这里主要介绍如何将其与Spring进行整合和使用。

首先,我们到ActiveMQ官网下载最新的软件包,地址:

http://activemq.apache.org/

 

然后,解压下载的二进制软件包,并切换到其下的bin目录下,执行activemq  start 命令启动这个消息代理,供后续发送消息到该队列中做准备,如果未启动,则连接发送消息时,会报出未连接队列错误。

 

B、建立一个连接工厂,通过它与消息代理进行直接交互。

接下来,我们需要一个与消息代理队列交互的连接工厂,这里我选用ActiveMQ自身提供的连接工厂工具ActiveMQConnectionFactory ,它可以很好地与Spring结合,如下Java Config配置(与XML类似):

@Bean
public
ActiveMQConnectionFactory connectionFactory() {    
       ActiveMQConnectionFactory mqConnectionFactory =
new ActiveMQConnectionFactory();
       mqConnectionFactory.setBrokerURL("tcp://localhost:61616"); 

       mqConnectionFactory.setUserName("root"); 

       mqConnectionFactory.setPassword("123456");
       return
mqConnectionFactory;
}

先建立一个ActiveMQConnectionFactory 实例,然后指定访问消息代理的地址:setBrokerURL ,而ActiveMQ默认的端口是61616。一般情况,消息代理组件都会有账号存在,所以需要设置访问的用户名和密码进行授权访问:setUserName 和setPassword 。

 

C、声明消息目的地:队列和主题

这里需声明一个ActiveMQ的点对点消息队列(ActiveMQQueue),和一个发布/订阅队列(ActiveMQTopic),因为它们指定了消息发送的队列类型和地址,具体配置如下:

ActiveMQQueue 配置:

 

@Bean
public ActiveMQQueue queue() {

    ActiveMQQueue mqQueue =
new
ActiveMQQueue();
    mqQueue.setPhysicalName("channel.test.queue");  

    return
mqQueue;
}

 

ActiveMQTopic 配置:

@Bean
public ActiveMQTopic topic() {

    ActiveMQTopic mqTopic =
new
ActiveMQTopic();
    mqTopic.setPhysicalName("channel.test.topic");  

    return
mqTopic;
}

 

如上所示,不论哪种类型的队列,我们都需要为其配置对应的Physical Name地址,因为它们是目的队列的引用地址。

 

 

D、自定义消息驱动,可以异步的接收消息

为了可以在接收端异步的接收消息,我们需要实现基于EJB的消息驱动模型,为了测试队列及主题两种方式的消息分发,这里建立了两个消息驱动,具体如下:

    @Bean
    public CMessageHandler messageHandler() {      

       return
new 
CMessageHandler();
    }
   
    @Bean
    public CMessageHandler2 messageHandler2() { 

       return
new 
CMessageHandler2();
    }

 

CMessageHandler.java:

public
class
CMessageHandler {
    public
void 
handleMessage(CMessage message) {
       System.out.println(message.getType() +
":" + message.getData());
    }
}

 

CMessageHandler2.java:

public
class
CMessageHandler2 {
    public
void 
handleMessage(CMessage message) {
       System.out.println(message.getType() +
":" + message.getData());
    }
}

 

需要注意的是这两个消息驱动POJO,在后面需要注入到消息监听适配器中,以实现异步接收消息的目的。

 

E、声明消息监听适配器,以实现异步接收消息

如上一步,我们已经自定义了两个消息驱动POJO模型,那么对应的也需要通过MessageListenerAdapter 监听方式,声明两个消息监听适配器,分别监听上面两个消息驱动,具体如下:

@Bean
public
MessageListenerAdapter messageListenerAdapter(){
       MessageListenerAdapter 
mlAdapter = new
MessageListenerAdapter();
       mlAdapter.setDelegate(messageHandler());
       mlAdapter.setDefaultListenerMethod("handleMessage");
       return
mlAdapter;
    }
   
@Bean
public
MessageListenerAdapter messageListenerAdapter2() {
       MessageListenerAdapter 
mlAdapter = new
MessageListenerAdapter();
       mlAdapter.setDelegate(messageHandler2());

       mlAdapter.setDefaultListenerMethod("handleMessage");
       return
mlAdapter;
    }

 

如上所示,通过setDelegate 指定了对应的两个消息驱动POJO模型,然后使用setDefaultListenerMethod 指定了监听的启动方法。

 

F、声明消息监听适配器容器,注入和管理监听适配器

接下来,我们需要针对队列(Queue)和主题(Topic),分别声明对应的两个监听适配器容器,具体如下:

@Bean
    public DefaultMessageListenerContainer queueMessageListenerAdapterContainer() {

       DefaultMessageListenerContainer container =
new DefaultMessageListenerContainer();
       container.setConnectionFactory(connectionFactory());

       container.setDestination(queue());

       container.setMessageListener(messageListenerAdapter());

       return
container;
    }
   
    @Bean
    public DefaultMessageListenerContainer queueMessageListenerAdapterContainer2() {  

       DefaultMessageListenerContainer container =
new DefaultMessageListenerContainer();
       container.setConnectionFactory(connectionFactory());

       container.setDestination(queue());

       container.setMessageListener(messageListenerAdapter2());
       return
container;
    }
   
    @Bean
    public DefaultMessageListenerContainer topicMessageListenerAdapterContainer() {

       DefaultMessageListenerContainer container =
new DefaultMessageListenerContainer();
       container.setConnectionFactory(connectionFactory());

       container.setDestination(topic());

       container.setMessageListener(messageListenerAdapter());

       return
container;
    }
   
    @Bean
    public DefaultMessageListenerContainer topicMessageListenerAdapterContainer2() {

       DefaultMessageListenerContainer container =
new DefaultMessageListenerContainer();
       container.setConnectionFactory(connectionFactory());
       container.setDestination(topic());

       container.setMessageListener(messageListenerAdapter2());
       return
container;
    }

如上所示,我们声明了两种类型的监听容器,分别为Queue和Topic,而每种容器分别声明两个监听容器,对应上面的两个监听适配器。

这里通过DefaultMessageListenerContainer 作为默认监听容器,使用setConnectionFactory 指定连接工厂,使用setDestination 绑定消息发送的目的地队列,并使用setMessageListener 设置对应的监听适配器。

 

G、使用Spring提供的JmsTemplate模版

在Spring中,已经提供了方便操作消息代理的工具JmsTemplate,通过它可以设置消息转发的模式(点对点或是发布/订阅),指定默认发送的消息目的地队列,及连接工厂。当然,为了保证传递的消息能够被正确的转换,我们可以指定消息转换器,如json格式转换,可以使用MappingJackson2MessageConverter ,可以这样配置:

@Bean
    public MappingJackson2MessageConverter messageConverter() {  

       return
new 
MappingJackson2MessageConverter();
    }

 

那么,下面罗列下两个JmsTemplate的配置,具体细节可以查看代码中的注释说明,具体如下:

@Bean
public JmsTemplate jmsQueueTemplate() {     
// 声明点对点队列JmsTemplate
       JmsTemplate jmsTemplate =
new JmsTemplate();
       jmsTemplate.setConnectionFactory(connectionFactory()); 
// 指定连接消息代理工厂
       jmsTemplate.setMessageConverter(messageConverter());   
// 指定默认的消息转换器
       jmsTemplate.setDefaultDestinationName("channel.test.queue");      
// 指定默认消息地址,针对目的地址不变情况
       jmsTemplate.setPubSubDomain(false);     
// 关闭发布/订阅模式
       return
jmsTemplate;
    }
   
@Bean
public JmsTemplate jmsTopicTemplate() {     
// 声明发布订阅队列JmsTemplate
       JmsTemplate jmsTemplate =
new JmsTemplate();
       jmsTemplate.setConnectionFactory(connectionFactory()); 
// 指定连接消息代理工厂
        jmsTemplate.setMessageConverter(messageConverter());   
// 指定默认的消息转换器
       jmsTemplate.setDefaultDestinationName("channel.test.topic");      
// 指定默认消息地址,针对目的地址不变情况
       jmsTemplate.setPubSubDomain(true);   
// 开启发布/订阅模式
       return
jmsTemplate;
    }

 

 

2、发送消息

A、建立一个消息业务服务

CMessageService.java:

public
interface
CMessageService {
    public
void 
sendMessage(CMessage message);
}

 

CMessageServiceImpl.java:

@Service("CMessageService")
public
class 
CMessageServiceImpl implements CMessageService {
    @Autowired
    JmsTemplate jmsQueueTemplate;
   
    @Autowired
    JmsTemplate jmsTopicTemplate;
   
    public
void
sendMessage(final CMessage
message) {
       JmsTemplate jmsTemplate =
null;
       if(message.getType().equals("queue")) {
           jmsTemplate =
jmsQueueTemplate;
       }else
if
(message.getType().equals("topic")) {
           jmsTemplate =
jmsTopicTemplate;
       }
      
       jmsTemplate.send(new MessageCreator() {     
// 缺省目的地地址
           public Message createMessage(Session
session) throws JMSException {
              return
session.createObjectMessage(message);   
// 创建消息
           }
       });
    }
}

 

 

B、建立一个发送消息控制器

MessageAction.java:

@RestController
@RequestMapping("/message")
public
class
MessageAction {
   @Autowired
   CMessageService messageService;
  
   @RequestMapping("/show")
   public ModelAndView sendPage() {
      return
new
ModelAndView("test/jms_api");
   }
  
   @RequestMapping(value="/sendMessage")
   public
void 
sendMessage(HttpServletRequest request)
throws Exception {
      String type =
request.getParameter("type");
      String data =
request.getParameter("data");
     
      CMessage message =
new
CMessage(); 
// 消息内容
      message.setType(type);
      message.setData(data);
      messageService.sendMessage(message);     
// 发送消息
   }
}

 

C、建立一个发送的jsp页面

jms_api.jsp(js部分):

<script
type="text/javascript">
           // 发送queue消息
           function sendQueueMessage(){
               var data = $.trim($("#taQueue").val());
              $.ajax({
                      url:'/xxx/message/sendMessage',
                      data:{type:'queue',data:data},
                      type:"get",
                      dataType:'json',
                      contentType:'application/json',
                      success:function(result){
                      }
              });
           }
          
           // 发送topic消息
           function sendTopicMessage(){
              var data = $.trim($("#taTopic").val());
              $.ajax({
                      url:'/xxx/message/sendMessage',
                      data:{type:'topic',data:data},
                      type:"get",
                      dataType:'json',
                      contentType:'application/json',
                      success:function(result){
                      }
              });
           }
       </script>

 

具体的效果如下:



 

 

3、接收消息

接收消息,则是通过上面声明的消息驱动POJO来处理的,其通过注入到消息监听器中,当有消息到队列的时候,监听器会即时从队列中获取消息,并将其传递给对应的消息驱动POJO来处理,并且是以异步的方式处理的消息。

 

A、点击发送Queue消息:



 

 

B、点击发送Topic消息:



 

 

如上所示,同样声明了两个监听者角色,Queue点对点模式时,接收者只收到一条消息,而Topic发布/订阅模式时,则注册订阅了这个主题的,都可以接收该主题消息,所以有两个接收者都收到消息。

 

 

 

 

由于作者水平有限,请在评论或(QQ: 245389109)不吝发言讨论,谢谢。

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: