rabbitMQ教程(三) spring整合rabbitMQ代码实例
2017-07-29 17:05
393 查看
一、开启rabbitMQ服务,导入MQ jar包和gson jar包(MQ默认的是jackson,但是效率不如Gson,所以我们用gson)
二、发送端配置,在spring配置文件中配置
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd"> <!-- 连接服务配置 如果MQ服务器在远程服务器上,请新建用户用新建的用户名密码 guest默认不允许远程登录--> <rabbit:connection-factory id="connectionFactory" host="localhost" username="guest" password="guest" port="5672" virtual-host="/" channel-cache-size="5" /> <!-- 配置爱admin,自动根据配置文件生成交换器和队列,无需手动配置 --> <rabbit:admin connection-factory="connectionFactory" /> <!-- queue 队列声明 --> <rabbit:queue durable="true" auto-delete="false" exclusive="false" name="spring.queue.tag" /> <!-- exchange queue binging key 绑定 --> <rabbit:direct-exchange name="spring.queue.exchange" durable="true" auto-delete="false"> <rabbit:bindings> <rabbit:binding queue="spring.queue.tag" key="spring.queue.tag.key" /> </rabbit:bindings> </rabbit:direct-exchange> <!-- spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列,由于Gson的速度快于jackson,这里替换为Gson的一个实现 --> <bean id="jsonMessageConverter" class="sendMQ.Gson2JsonMessageConverter" /> <!-- spring template声明 --> <rabbit:template id="amqpTemplate" exchange="spring.queue.exchange" routing-key="spring.queue.tag.key" connection-factory="connectionFactory" message-converter="jsonMessageConverter" />
发送端代码:GSON配置
package sendMQ; import java.io.IOException; import java.io.UnsupportedEncodingException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.support.converter.AbstractJsonMessageConverter; import org.springframework.amqp.support.converter.ClassMapper; import org.springframework.amqp.support.converter.DefaultClassMapper; import org.springframework.amqp.support.converter.MessageConversionException; import com.google.gson.Gson; public class Gson2JsonMessageConverter extends AbstractJsonMessageConverter{ private static Log log = LogFactory.getLog(Gson2JsonMessageConverter.class); private static ClassMapper classMapper = new DefaultClassMapper(); private static Gson gson = new Gson(); public Gson2JsonMessageConverter() { super(); } @Override protected Message createMessage(Object object, MessageProperties messageProperties) { byte[] bytes = null; try { String jsonString = gson.toJson(object); bytes = jsonString.getBytes(getDefaultCharset()); } catch (IOException e) { throw new MessageConversionException( "Failed to convert Message content", e); } messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON); messageProperties.setContentEncoding(getDefaultCharset()); if (bytes != null) { messageProperties.setContentLength(bytes.length); } classMapper.fromClass(object.getClass(),messageProperties); return new Message(bytes, messageProperties); } @Override public Object fromMessage(Message message) throws MessageConversionException { Object content = null; MessageProperties properties = message.getMessageProperties(); if (properties != null) { String contentType = properties.getContentType(); if (contentType != null && contentType.contains("json")) { String encoding = properties.getContentEncoding(); if (encoding == null) { encoding = getDefaultCharset(); } try { Class<?> targetClass = getClassMapper().toClass( message.getMessageProperties()); content = convertBytesToObject(message.getBody(), encoding, targetClass); } catch (IOException e) { throw new MessageConversionException( "Failed to convert Message content", e); } } else { log.warn("Could not convert incoming message with content-type [" + contentType + "]"); } } if (content == null) { content = message.getBody(); } return content; } private Object convertBytesToObject(byte[] body, String encoding, Class<?> clazz) throws UnsupportedEncodingException { String contentAsString = new String(body, encoding); return gson.fromJson(contentAsString, clazz); } }
发送类接口:
public interface MQProducer { /** * 发送消息到指定队列 * @param queueKey * @param object */ public void sendDataToQueue(String queueKey, Object object); }
实现类:test是测试用的。
package sendMQ; import java.util.HashMap; import java.util.Map; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.stereotype.Component; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; @RunWith(value = SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = { "classpath:/spring-common.xml"}) @Component public class MQProducerImpl implements MQProducer { @Autowired private AmqpTemplate amqpTemplate; @Override public void sendDataToQueue(String queueKey, Object object) { System.out.println("--"+amqpTemplate); try { amqpTemplate.convertAndSend(object); System.out.println("------------消息发送成功"); } catch (Exception e) { System.out.println(e); } } @Test public void test() { Map<String,Object> msg = new HashMap<>(); msg.put("data","hello,456"); while(true){ amqpTemplate.convertAndSend(msg); try { Thread.sleep(2000); } catch (InterruptedException e) { // TODO 自动生成的 catch 块 e.printStackTrace(); } } } }
接收端配置:
<!-- 连接服务配置 --> <rabbit:connection-factory id="connectionFactory" host="localhost" username="guest" password="guest" port="5672" virtual-host="/" channel-cache-size="5" /> <rabbit:admin connection-factory="connectionFactory"/> <!-- queue 队列声明--> <rabbit:queue durable="true" auto-delete="false" exclusive="false" name="spring.queue.tag"/> <!-- exchange queue binging key 绑定 --> <rabbit:direct-exchange name="spring.queue.exchange" durable="true" auto-delete="false"> <rabbit:bindings> <rabbit:binding queue="spring.queue.tag" key="spring.queue.tag.key"/> </rabbit:bindings> </rabbit:direct-exchange> <bean id="receiveMessageListener" class="receiveMQ.QueueListenter" /> <!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象--> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" > <rabbit:listener queues="spring.queue.tag" ref="receiveMessageListener" /> </rabbit:listener-container>
接收端代码:
package receiveMQ; import java.io.UnsupportedEncodingException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; public class QueueListenter implements MessageListener{ @Override public void onMessage(Message msg) { try { System.out.print("-------------------"+new String(msg.getBody(),"UTF-8")); } catch (UnsupportedEncodingException e) { // TODO 自动生成的 catch 块 e.printStackTrace(); } } }
接收端测试启动:
package receiveMQ; import org.springframework.context.support.ClassPathXmlApplicationContext; public class ConsumerMain { public static void main(String[] args) { new ClassPathXmlApplicationContext("spring-common.xml"); } }
上面代码均有注释,应该不难看懂,复制即可使用,实现了MQ的简单功能。
说明:可以配置多个接收端,spring默认的是负载均衡机制,每个接收端接收一条的来,这些扩展功能待后面有时间再讲解
相关文章推荐
- rabbitMQ教程(三) spring整合rabbitMQ代码实例
- spring整合rabbitMQ代码实例
- ssh框架搭建Struts2.06+spring2.5+hibernate3.2整合实例代码教程步骤
- SSH框架搭建Struts2.06+spring2.5+hibernate3.2整合实例代码教程步骤
- ssh框架搭建Struts2.06+spring2.5+hibernate3.2整合实例代码教程步骤
- Apache Shiro+SpringMVC+Hibernate Search+Hibernate+Bootstrap企业信息管理系统基础框架搭建整合实例代码教程
- Apache Shiro+SpringMVC+Hibernate Search+Hibernate+Bootstrap企业信息管理系统基础框架搭建整合实例代码教程
- 消息队列 RabbitMQ 与 Spring 整合使用的实例代码
- spring整合rabbitMQ代码实例
- spring boot整合spring-kafka实现发送接收消息实例代码
- SpringBoot非官方教程 | 第十五篇:Springboot整合RabbitMQ
- MQ消息队列--RabbitMQ整合Spring理论及实例讲解
- XFire+Spring整合构建Web Service的框架搭建实例代码下载
- springmvc+spring jdbc+velocity入门教程及其代码实例
- jbpm4整合struts2+spring2.5+hibernate3.3入门实例教程
- 详解MongoDB和Spring整合的实例代码
- Spring Boot整合mybatis(一)实例代码
- spring整合redis实现数据缓存的实例代码
- FreeMarker与SpringMVC整合实例代码教程
- 企业级 SpringBoot 教程 (十五)Springboot整合RabbitMQ