spring整合消息队列rabbitmq
2015-10-14 22:40
711 查看
spring大家太熟,就不多说了
rabbitmq一个amqp的队列服务实现,具体介绍请参考本文http://lynnkong.iteye.com/blog/1699684
本文侧重介绍如何将rabbitmq整合到项目中
ps:本文只是简单一个整合介绍,属于抛砖引玉,具体实现还需大家深入研究哈..
1.首先是生产者配置
2.fastjson messageconver插件实现
3.生产者端调用
4.消费者端配置(与生产者端大同小异)
5.消费者端调用
6.由于消费端当队列有数据到达时,对应监听的对象就会被通知到,无法做到批量获取,批量入库,因此可以在消费端缓存一个临时队列,将mq取出来的数据存入本地队列,后台线程定时批量处理即可
rabbitmq一个amqp的队列服务实现,具体介绍请参考本文http://lynnkong.iteye.com/blog/1699684
本文侧重介绍如何将rabbitmq整合到项目中
ps:本文只是简单一个整合介绍,属于抛砖引玉,具体实现还需大家深入研究哈..
1.首先是生产者配置
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd"> <!-- 连接服务配置 --> <rabbit:connection-factory id="connectionFactory" host="localhost" username="guest" password="guest" port="5672" /> <rabbit:admin connection-factory="connectionFactory"/> <!-- queue 队列声明 自动创建--> <rabbit:queue id="queue_one" durable="true" auto-delete="false" exclusive="false" name="queue_one"/> <!-- exchange queue binging key 绑定 自动创建 --> <rabbit:direct-exchange name="my-mq-exchange" durable="true" auto-delete="false" id="my-mq-exchange"> <rabbit:bindings> <rabbit:binding queue="queue_one" key="queue_one_key"/> </rabbit:bindings> </rabbit:direct-exchange> <-- spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列,由于fastjson的速度快于jackson,这里替换为fastjson的一个实现 --> <bean id="jsonMessageConverter" class="mq.convert.FastJsonMessageConverter"></bean> <-- spring template声明--> <rabbit:template exchange="my-mq-exchange" id="amqpTemplate" connection-factory="connectionFactory" message-converter="jsonMessageConverter"/> </beans>
2.fastjson messageconver插件实现
import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.support.converter.AbstractMessageConverter; import org.springframework.amqp.support.converter.MessageConversionException; import fe.json.FastJson; public class FastJsonMessageConverter extends AbstractMessageConverter { private static Log log = LogFactory.getLog(FastJsonMessageConverter.class); public static final String DEFAULT_CHARSET = "UTF-8"; private volatile String defaultCharset = DEFAULT_CHARSET; public FastJsonMessageConverter() { super(); //init(); } public void setDefaultCharset(String defaultCharset) { this.defaultCharset = (defaultCharset != null) ? defaultCharset : DEFAULT_CHARSET; } public Object fromMessage(Message message) throws MessageConversionException { return null; } public <T> T fromMessage(Message message,T t) { String json = ""; try { json = new String(message.getBody(),defaultCharset); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } return (T) FastJson.fromJson(json, t.getClass()); } protected Message createMessage(Object objectToConvert, MessageProperties messageProperties) throws MessageConversionException { byte[] bytes = null; try { String jsonString = FastJson.toJson(objectToConvert); bytes = jsonString.getBytes(this.defaultCharset); } catch (UnsupportedEncodingException e) { throw new MessageConversionException( "Failed to convert Message content", e); } messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON); messageProperties.setContentEncoding(this.defaultCharset); if (bytes != null) { messageProperties.setContentLength(bytes.length); } return new Message(bytes, messageProperties); } }
3.生产者端调用
import java.util.List; import org.springframework.amqp.core.AmqpTemplate; public class MyMqGatway { @Autowired private AmqpTemplate amqpTemplate; public void sendDataToCrQueue(Object obj) { amqpTemplate.convertAndSend("queue_one_key", obj); } }
4.消费者端配置(与生产者端大同小异)
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd"> <!-- 连接服务配置 --> <rabbit:connection-factory id="connectionFactory" host="localhost" username="guest" password="guest" port="5672" /> <rabbit:admin connection-factory="connectionFactory"/> <!-- queue 队列声明--> <rabbit:queue id="queue_one" durable="true" auto-delete="false" exclusive="false" name="queue_one"/> <!-- exchange queue binging key 绑定 --> <rabbit:direct-exchange name="my-mq-exchange" durable="true" auto-delete="false" id="my-mq-exchange"> <rabbit:bindings> <rabbit:binding queue="queue_one" key="queue_one_key"/> </rabbit:bindings> </rabbit:direct-exchange> <!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象--> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" task-executor="taskExecutor"> <rabbit:listener queues="queue_one" ref="queueOneLitener"/> </rabbit:listener-container> </beans>
5.消费者端调用
import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; public class QueueOneLitener implements MessageListener{ @Override public void onMessage(Message message) { System.out.println(" data :" + message.getBody()); } }
6.由于消费端当队列有数据到达时,对应监听的对象就会被通知到,无法做到批量获取,批量入库,因此可以在消费端缓存一个临时队列,将mq取出来的数据存入本地队列,后台线程定时批量处理即可
相关文章推荐
- 动态执行的方法java
- Java反射
- null pointer at org.springframework.beans.PropertyEditorRegistrySupport
- 给出 《java学习笔记》 PDF格式
- 黑马程序员一一高级开发工具Eclipse
- Spring 容器初始化方法
- java缓存技术
- java判断一个字符串是否为空的方法总结
- 学习心得(Java语言)
- Java从入门到精通14-GridLayout网格布局
- springMVC的HandlerInterceptor拦截器
- 关于java中有符号数转换成无符号数的相关问题
- Java从入门到精通13-FlowLayout布局
- 抽象工厂模式(java语言实现)
- 《深入理解java虚拟机》之内存模型与安全
- Java内存泄露监控工具__JVM监控工具介绍
- 转换java keytools的keystore证书到OPENSSL的PEM格式文件
- java锁机制
- Eclipse操作技巧
- MyEclipse 查看JDK类库的源代码