RabbitMQ > Spring AMQP
2015-09-19 23:28
387 查看
Spring AMQP Project将Spring的核心观念应用于基于AMQP协议的消息解决方案,它包括两部分:spring-amqp是基础抽象,spring-rabbit是基于RabbitMQ的实现。
特征:
1)提供了一个”模板”RabbitTemplate来发送和接收消息
2)使用Listener Container用于异步处理进来的消息
3)使用RabbitAdmin自动声明Queues、Exchanges和Bindings
下面结合Spring amqp官方的Quick Start(http://projects.spring.io/spring-amqp/#quick-start)解释其用法。
1.在pom.xml (maven)中引入spring-rabbit dependency
2.在对应project的src/main/resources/目录下添加spring-rabbit.xml(我本机目录是src/main/resources/spring/spring-rabbitmq.xml)
解释一下配置文件的内容。
1)连接管理
rabbit namespace方便地指定了rabbitmq server所在主机的Host、Port、用户名和密码,以及缓存Channel的初始值。
管理与RabbitMQ连接的核心组件是ConnectionFactory接口,该接口的职责是提供org.springframework.amqp.rabbit.connection.Connection的实例对象,Spring提供的ConnectionFactory接口的唯一实现是CachingConnectionFactory。默认的cache-mode是CHANNEL,channel-cache-size默认值是1。可以修改cache-mode为CONNECTION,然后使用connection-cache-size指定缓存connection的数目。
2)发送和接收消息的“模板”
rabbit:template声明了一个Spring Amqp中的用于发送和接收消息的高级抽象类Template的实例,connection-Facotry规定了该template使用的连接工厂,exchange指定消息将被发送到哪个exchange上,routing-key是该template所发送消息的默认routingKey,如果不指定routing-key的值,那么默认值就是空字符串。可以在tempate的ConvertAndSend方法中指定routingKey。
3)自动声明Queues、Exchanges和Bindings
接口AmqpAdmin定义了AMQP协议下针对Queue、Exchange和Binding的基础管理操作。RabbitMQ下实现AmqpAdmin接口的是RabbitAdmin,上面的rabbit:admin就是声明了一个RabbitAdmin的实例。
当CachingConnectionFactory cache-mode是CHANNEL(默认)的时候,RabbitAdmin对同一个Spring Context下的Queues、Exchange和Binding自动执行lazy declaration,这个declaration要直到a Connection is opened to the broker才进行。
rabbit namespace提供了一些元素方便queue(rabbit:queue)、exchange(如声明topic类型的exchange rabbit:topic-exchange)、binding(rabbit:binding)的声明。
其中,rabbit:binding的pattern属性则是topic exchange的具有通配符匹配方式的routingKey。
4)Asynchronous Consumer
当有消息发送到名为myQueue的Queue时,消息将会进入Id为foo的Bean的listen方法中处理。
3.将rabbit.xml加载到Spring Context
若是Spring MVC,则交给org.springframework.web.context.ContextLoaderListener来做,在web.xml声明listener和context-param:
4.sending messages & receiving messages
使用AmqpTemplate发送消息时,可以使用以下方法:
第三个方法的描述最准确,使用示例如下,
准确的描述了将把消息发送到名为”marketData.topic”的exchange上,使用的routingKey是”quotes.nasdaq.FOO”,内容体是12.34和一些消息属性。
可是通常情况下,我们要发送给exchange的内容并不是一个字符串那么简单,而是诸多信息的集合,比如类的实例。AmqpTemplate定义了几个委托给MessageConverter来发送和接收消息的方法。MessageConverter接口很简单,仅提供了两个方法:一个用来把Object实例转换为Message实例,另一个则是把一个Message实例转化为一个Object实例。
和发送相关的方法如下:
这些方法要比上面的send方法简单,因为不需要Message实例,取而代之的是MessageConverter负责把一个Object实例转化为Message body所需的byte array。
举个例子,把提现简化为只有两个操作的情形,一是减少余额,二是向用户发送短信,后者不用那么即时,所以把短信发送的相关内容放到mq里异步处理就行,这里的相关内容包括:手机号、短信内容、信息类型等,通常会自定义类:
使用amqpTemplate发送消息:
按照上面的配置,
消息将被发送到名为myExchange的exchange上,然后命中binding key “foo.*”,从而消息被送往myQueue队列;
消息接着被送到bean id为foo的listen方法,listen方法示例:
参考资料:
http://docs.spring.io/spring-amqp/docs/1.5.4.RELEASE/reference/html
特征:
1)提供了一个”模板”RabbitTemplate来发送和接收消息
2)使用Listener Container用于异步处理进来的消息
3)使用RabbitAdmin自动声明Queues、Exchanges和Bindings
下面结合Spring amqp官方的Quick Start(http://projects.spring.io/spring-amqp/#quick-start)解释其用法。
1.在pom.xml (maven)中引入spring-rabbit dependency
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.5.4.RELEASE</version> </dependency>
2.在对应project的src/main/resources/目录下添加spring-rabbit.xml(我本机目录是src/main/resources/spring/spring-rabbitmq.xml)
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd"> <rabbit:connection-factory id="connectionFactory" host="192.168.73.128" port="5672" username="tony" password="123" channel-cache-size="25"/> <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="myExchange" routing-key="foo.bar"/> <rabbit:admin connection-factory="connectionFactory" /> <rabbit:queue name="myQueue"/> <rabbit:topic-exchange name="myExchange"> <rabbit:bindings> <rabbit:binding queue="myQueue" pattern="foo.*"/> </rabbit:bindings> </rabbit:topic-exchange> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener ref="foo" method="listen" queue-names="myQueue"/> </rabbit:listener-container> <bean id="foo" class="foo.Foo" /> </beans>
解释一下配置文件的内容。
1)连接管理
<rabbit:connection-factory id= "connectionFactory" host="192.168.73.128" port="5672" username="tony" password="123" channel-cache-size="25"/>
rabbit namespace方便地指定了rabbitmq server所在主机的Host、Port、用户名和密码,以及缓存Channel的初始值。
管理与RabbitMQ连接的核心组件是ConnectionFactory接口,该接口的职责是提供org.springframework.amqp.rabbit.connection.Connection的实例对象,Spring提供的ConnectionFactory接口的唯一实现是CachingConnectionFactory。默认的cache-mode是CHANNEL,channel-cache-size默认值是1。可以修改cache-mode为CONNECTION,然后使用connection-cache-size指定缓存connection的数目。
2)发送和接收消息的“模板”
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="myExchange" routing-key="foo.bar"/>
rabbit:template声明了一个Spring Amqp中的用于发送和接收消息的高级抽象类Template的实例,connection-Facotry规定了该template使用的连接工厂,exchange指定消息将被发送到哪个exchange上,routing-key是该template所发送消息的默认routingKey,如果不指定routing-key的值,那么默认值就是空字符串。可以在tempate的ConvertAndSend方法中指定routingKey。
3)自动声明Queues、Exchanges和Bindings
<rabbit:admin connection-factory="connectionFactory" /> <rabbit:queue name="myQueue" /> <rabbit:topic-exchange name="myExchange"> <rabbit:bindings> <rabbit:binding queue="myQueue" pattern="foo.*" /> </rabbit:bindings> </rabbit:topic-exchange >
接口AmqpAdmin定义了AMQP协议下针对Queue、Exchange和Binding的基础管理操作。RabbitMQ下实现AmqpAdmin接口的是RabbitAdmin,上面的rabbit:admin就是声明了一个RabbitAdmin的实例。
当CachingConnectionFactory cache-mode是CHANNEL(默认)的时候,RabbitAdmin对同一个Spring Context下的Queues、Exchange和Binding自动执行lazy declaration,这个declaration要直到a Connection is opened to the broker才进行。
rabbit namespace提供了一些元素方便queue(rabbit:queue)、exchange(如声明topic类型的exchange rabbit:topic-exchange)、binding(rabbit:binding)的声明。
其中,rabbit:binding的pattern属性则是topic exchange的具有通配符匹配方式的routingKey。
4)Asynchronous Consumer
<rabbit:listener-container connection-factory="connectionFactory" > <rabbit:listener ref="foo" method="listen" queue-names="myQueue"/> </rabbit:listener-container >
当有消息发送到名为myQueue的Queue时,消息将会进入Id为foo的Bean的listen方法中处理。
3.将rabbit.xml加载到Spring Context
AbstractApplicationContext ctx = new ClassPathXmlApplicationContext( "rabbit.xml");
若是Spring MVC,则交给org.springframework.web.context.ContextLoaderListener来做,在web.xml声明listener和context-param:
<listener> <description>spring监听器</description> <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class> </listener> <context-param> <param-name>contextConfigLocation</param-name> <param-value>classpath:spring/spring-rabbit.xml</param-value> </context-param>
4.sending messages & receiving messages
使用AmqpTemplate发送消息时,可以使用以下方法:
void send(Message message) throws AmqpException; void send(String routingKey, Message message) throws AmqpException; void send(String exchange, String routingKey, Message message) throws AmqpException;
第三个方法的描述最准确,使用示例如下,
amqpTemplate.send("marketData.topic", "quotes.nasdaq.FOO", new Message("12.34".getBytes(), someProperties));
准确的描述了将把消息发送到名为”marketData.topic”的exchange上,使用的routingKey是”quotes.nasdaq.FOO”,内容体是12.34和一些消息属性。
可是通常情况下,我们要发送给exchange的内容并不是一个字符串那么简单,而是诸多信息的集合,比如类的实例。AmqpTemplate定义了几个委托给MessageConverter来发送和接收消息的方法。MessageConverter接口很简单,仅提供了两个方法:一个用来把Object实例转换为Message实例,另一个则是把一个Message实例转化为一个Object实例。
public interface MessageConverter { Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException; Object fromMessage(Message message) throws MessageConversionException; }
和发送相关的方法如下:
void convertAndSend(Object message) throws AmqpException; void convertAndSend(String routingKey, Object message) throws AmqpException; void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException; void convertAndSend(Object message, MessagePostProcessor messagePostProcessor) throws AmqpException; void convertAndSend(String routingKey, Object message,MessagePostProcessor messagePostProcessor) throws AmqpException; void convertAndSend(String exchange, String routingKey, Object message,MessagePostProcessor messagePostProcessor) throws AmqpException;
这些方法要比上面的send方法简单,因为不需要Message实例,取而代之的是MessageConverter负责把一个Object实例转化为Message body所需的byte array。
举个例子,把提现简化为只有两个操作的情形,一是减少余额,二是向用户发送短信,后者不用那么即时,所以把短信发送的相关内容放到mq里异步处理就行,这里的相关内容包括:手机号、短信内容、信息类型等,通常会自定义类:
import java.io.Serializable; public class SMSMqMessage implements Serializable { private static final long serialVersionUID = 4434747210124038681L; private String mobile; //手机号 private String content; //短信内容 public SMSMqMessage() { } public SMSMqMessage(String mobile, String content) { this.mobile = mobile; this.content = content; } // set、get方法... }
使用amqpTemplate发送消息:
SMSMqMessage msg = new SMSMqMessage("15100001111","提现100元"); amqpTemplate.convertAndSend("myExchange", "foo.test", msg);
按照上面的配置,
<rabbit:topic-exchange name="myExchange"> <rabbit:bindings> <rabbit:binding queue="myQueue" pattern="foo.*" /> </rabbit:bindings> </rabbit:topic-exchange >
消息将被发送到名为myExchange的exchange上,然后命中binding key “foo.*”,从而消息被送往myQueue队列;
<rabbit:listener-container connection-factory="connectionFactory" > <rabbit:listener ref="foo" method="listen" queue-names="myQueue"/> </rabbit:listener-container >
消息接着被送到bean id为foo的listen方法,listen方法示例:
public void listen(SMSMqMessage msg){ String mobile = msg.getMobile(); String content = msg.getContent(); //调用短信服务发送短信 ... }
参考资料:
http://docs.spring.io/spring-amqp/docs/1.5.4.RELEASE/reference/html
相关文章推荐
- Java面向对象的多态
- Hashmap in java
- java异常处理的两种方法
- eclipse配置流程
- java:IO流学习小结
- Java:String字符串内存分析
- NetBeans实现计算器
- java集合框架之Map
- 各种排序算法的实现Java
- java调用存储过程mysql
- JAVA基础--db05_java数组常见功能-查表法
- 我尝试安装了jdk1.6,但是装过,检查ava版本仍然是1.3的解决
- java 字符串缓冲池 String缓冲池 == 和equals
- JavaEE系列之(一)JSP基础知识详解
- Java记录 -40- 定义常量最佳方法
- 解决java.net.MalformedURLException: unknown protocol: c问题
- 韩顺平 java 第十讲 重载、覆盖
- java正则表达式基础
- struts+hibernate+spring整合过程常见问题收集
- Java多线程基础知识(二)